1 package org.apache.lucene.index;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.Closeable;
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.nio.file.NoSuchFileException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Locale;
35 import java.util.Map.Entry;
36 import java.util.Map;
37 import java.util.Queue;
38 import java.util.Set;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import org.apache.lucene.analysis.Analyzer;
43 import org.apache.lucene.codecs.Codec;
44 import org.apache.lucene.codecs.FieldInfosFormat;
45 import org.apache.lucene.document.Field;
46 import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
47 import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
48 import org.apache.lucene.index.FieldInfos.FieldNumbers;
49 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
50 import org.apache.lucene.search.MatchAllDocsQuery;
51 import org.apache.lucene.search.Query;
52 import org.apache.lucene.store.AlreadyClosedException;
53 import org.apache.lucene.store.Directory;
54 import org.apache.lucene.store.FilterDirectory;
55 import org.apache.lucene.store.FlushInfo;
56 import org.apache.lucene.store.IOContext;
57 import org.apache.lucene.store.IndexOutput;
58 import org.apache.lucene.store.Lock;
59 import org.apache.lucene.store.LockObtainFailedException;
60 import org.apache.lucene.store.LockValidatingDirectoryWrapper;
61 import org.apache.lucene.store.MMapDirectory;
62 import org.apache.lucene.store.MergeInfo;
63 import org.apache.lucene.store.RateLimitedIndexOutput;
64 import org.apache.lucene.store.SleepingLockWrapper;
65 import org.apache.lucene.store.TrackingDirectoryWrapper;
66 import org.apache.lucene.util.Accountable;
67 import org.apache.lucene.util.Bits;
68 import org.apache.lucene.util.BytesRef;
69 import org.apache.lucene.util.CloseableThreadLocal;
70 import org.apache.lucene.util.Constants;
71 import org.apache.lucene.util.IOUtils;
72 import org.apache.lucene.util.InfoStream;
73 import org.apache.lucene.util.StringHelper;
74 import org.apache.lucene.util.ThreadInterruptedException;
75 import org.apache.lucene.util.Version;
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198 public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
199
200
201
202
203
204
205 public static final int MAX_DOCS = Integer.MAX_VALUE - 128;
206
207
208 public static final int MAX_POSITION = Integer.MAX_VALUE - 128;
209
210
211
212 private static int actualMaxDocs = MAX_DOCS;
213
214
215 static void setMaxDocs(int maxDocs) {
216 if (maxDocs > MAX_DOCS) {
217
218 throw new IllegalArgumentException("maxDocs must be <= IndexWriter.MAX_DOCS=" + MAX_DOCS + "; got: " + maxDocs);
219 }
220 IndexWriter.actualMaxDocs = maxDocs;
221 }
222
223 static int getActualMaxDocs() {
224 return IndexWriter.actualMaxDocs;
225 }
226
227
228 boolean enableTestPoints = false;
229
230 private static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
231
232
233
234
235 public static final String WRITE_LOCK_NAME = "write.lock";
236
237
238 public static final String SOURCE = "source";
239
240 public static final String SOURCE_MERGE = "merge";
241
242 public static final String SOURCE_FLUSH = "flush";
243
244 public static final String SOURCE_ADDINDEXES_READERS = "addIndexes(CodecReader...)";
245
246
247
248
249
250
251
252
253
254 public final static int MAX_TERM_LENGTH = DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8;
255
256 volatile Throwable tragedy;
257
258 private final Directory directoryOrig;
259 private final Directory directory;
260 private final Directory mergeDirectory;
261 private final Analyzer analyzer;
262
263 private final AtomicLong changeCount = new AtomicLong();
264 private volatile long lastCommitChangeCount;
265
266 private List<SegmentCommitInfo> rollbackSegments;
267
268 volatile SegmentInfos pendingCommit;
269 volatile long pendingCommitChangeCount;
270
271 private Collection<String> filesToCommit;
272
273 final SegmentInfos segmentInfos;
274 final FieldNumbers globalFieldNumberMap;
275
276 private final DocumentsWriter docWriter;
277 private final Queue<Event> eventQueue;
278 final IndexFileDeleter deleter;
279
280
281 private Map<SegmentCommitInfo,Boolean> segmentsToMerge = new HashMap<>();
282 private int mergeMaxNumSegments;
283
284 private Lock writeLock;
285
286 private volatile boolean closed;
287 private volatile boolean closing;
288
289
290
291 private HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
292
293 private final MergeScheduler mergeScheduler;
294 private LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<>();
295 private Set<MergePolicy.OneMerge> runningMerges = new HashSet<>();
296 private List<MergePolicy.OneMerge> mergeExceptions = new ArrayList<>();
297 private long mergeGen;
298 private boolean stopMerges;
299 private boolean didMessageState;
300
301 final AtomicInteger flushCount = new AtomicInteger();
302 final AtomicInteger flushDeletesCount = new AtomicInteger();
303
304 final ReaderPool readerPool = new ReaderPool();
305 final BufferedUpdatesStream bufferedUpdatesStream;
306
307
308
309
310
311
312
313
314
315
316 private volatile boolean poolReaders;
317
318
319
320 private final LiveIndexWriterConfig config;
321
322
323
324 private long startCommitTime;
325
326
327
328
329
330
331 final AtomicLong pendingNumDocs = new AtomicLong();
332
333 final CloseableThreadLocal<MergeRateLimiter> rateLimiters = new CloseableThreadLocal<>();
334
335 DirectoryReader getReader() throws IOException {
336 return getReader(true);
337 }
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397 DirectoryReader getReader(boolean applyAllDeletes) throws IOException {
398 ensureOpen();
399
400 final long tStart = System.currentTimeMillis();
401
402 if (infoStream.isEnabled("IW")) {
403 infoStream.message("IW", "flush at getReader");
404 }
405
406
407
408 poolReaders = true;
409 DirectoryReader r = null;
410 doBeforeFlush();
411 boolean anyChanges = false;
412
413
414
415
416
417
418
419 boolean success2 = false;
420 try {
421 boolean success = false;
422 synchronized (fullFlushLock) {
423 try {
424 anyChanges = docWriter.flushAllThreads();
425 if (!anyChanges) {
426
427
428 flushCount.incrementAndGet();
429 }
430
431
432
433 synchronized(this) {
434 anyChanges |= maybeApplyDeletes(applyAllDeletes);
435 r = StandardDirectoryReader.open(this, segmentInfos, applyAllDeletes);
436 if (infoStream.isEnabled("IW")) {
437 infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
438 }
439 }
440 success = true;
441 } finally {
442
443 docWriter.finishFullFlush(this, success);
444 if (success) {
445 processEvents(false, true);
446 doAfterFlush();
447 } else {
448 if (infoStream.isEnabled("IW")) {
449 infoStream.message("IW", "hit exception during NRT reader");
450 }
451 }
452 }
453 }
454 if (anyChanges) {
455 maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
456 }
457 if (infoStream.isEnabled("IW")) {
458 infoStream.message("IW", "getReader took " + (System.currentTimeMillis() - tStart) + " msec");
459 }
460 success2 = true;
461 } catch (AbortingException | VirtualMachineError tragedy) {
462 tragicEvent(tragedy, "getReader");
463
464 return null;
465 } finally {
466 if (!success2) {
467 IOUtils.closeWhileHandlingException(r);
468 }
469 }
470 return r;
471 }
472
473 @Override
474 public final long ramBytesUsed() {
475 ensureOpen();
476 return docWriter.ramBytesUsed();
477 }
478
479 @Override
480 public Collection<Accountable> getChildResources() {
481 return Collections.emptyList();
482 }
483
484
485
486
487
488
489
490
491 class ReaderPool implements Closeable {
492
493 private final Map<SegmentCommitInfo,ReadersAndUpdates> readerMap = new HashMap<>();
494
495
496 public synchronized boolean infoIsLive(SegmentCommitInfo info) {
497 int idx = segmentInfos.indexOf(info);
498 assert idx != -1: "info=" + info + " isn't live";
499 assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos";
500 return true;
501 }
502
503 public synchronized void drop(SegmentCommitInfo info) throws IOException {
504 final ReadersAndUpdates rld = readerMap.get(info);
505 if (rld != null) {
506 assert info == rld.info;
507
508 readerMap.remove(info);
509 rld.dropReaders();
510 }
511 }
512
513 public synchronized boolean anyPendingDeletes() {
514 for(ReadersAndUpdates rld : readerMap.values()) {
515 if (rld.getPendingDeleteCount() != 0) {
516 return true;
517 }
518 }
519
520 return false;
521 }
522
523 public synchronized void release(ReadersAndUpdates rld) throws IOException {
524 release(rld, true);
525 }
526
527 public synchronized void release(ReadersAndUpdates rld, boolean assertInfoLive) throws IOException {
528
529
530 rld.decRef();
531
532
533 assert rld.refCount() >= 1;
534
535 if (!poolReaders && rld.refCount() == 1) {
536
537
538
539 if (rld.writeLiveDocs(directory)) {
540
541 assert assertInfoLive == false || infoIsLive(rld.info);
542
543
544
545
546
547
548
549 checkpointNoSIS();
550 }
551
552
553
554 rld.dropReaders();
555 readerMap.remove(rld.info);
556 }
557 }
558
559 @Override
560 public void close() throws IOException {
561 dropAll(false);
562 }
563
564
565
566 synchronized void dropAll(boolean doSave) throws IOException {
567 Throwable priorE = null;
568 final Iterator<Map.Entry<SegmentCommitInfo,ReadersAndUpdates>> it = readerMap.entrySet().iterator();
569 while(it.hasNext()) {
570 final ReadersAndUpdates rld = it.next().getValue();
571
572 try {
573 if (doSave && rld.writeLiveDocs(directory)) {
574
575 assert infoIsLive(rld.info);
576
577
578
579
580
581
582
583 checkpointNoSIS();
584 }
585 } catch (Throwable t) {
586 if (doSave) {
587 IOUtils.reThrow(t);
588 } else if (priorE == null) {
589 priorE = t;
590 }
591 }
592
593
594
595
596
597 it.remove();
598
599
600
601
602
603 try {
604 rld.dropReaders();
605 } catch (Throwable t) {
606 if (doSave) {
607 IOUtils.reThrow(t);
608 } else if (priorE == null) {
609 priorE = t;
610 }
611 }
612 }
613 assert readerMap.size() == 0;
614 IOUtils.reThrow(priorE);
615 }
616
617
618
619
620
621
622
623 public synchronized void commit(SegmentInfos infos) throws IOException {
624 for (SegmentCommitInfo info : infos) {
625 final ReadersAndUpdates rld = readerMap.get(info);
626 if (rld != null) {
627 assert rld.info == info;
628 if (rld.writeLiveDocs(directory)) {
629
630 assert infoIsLive(info);
631
632
633
634
635
636
637
638
639 checkpointNoSIS();
640 }
641 }
642 }
643 }
644
645
646
647
648
649
650 public synchronized ReadersAndUpdates get(SegmentCommitInfo info, boolean create) {
651
652
653 ensureOpen(false);
654
655 assert info.info.dir == directoryOrig: "info.dir=" + info.info.dir + " vs " + directoryOrig;
656
657 ReadersAndUpdates rld = readerMap.get(info);
658 if (rld == null) {
659 if (!create) {
660 return null;
661 }
662 rld = new ReadersAndUpdates(IndexWriter.this, info);
663
664 readerMap.put(info, rld);
665 } else {
666 assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + infoIsLive(rld.info) + " vs " + infoIsLive(info);
667 }
668
669 if (create) {
670
671 rld.incRef();
672 }
673
674 assert noDups();
675
676 return rld;
677 }
678
679
680
681 private boolean noDups() {
682 Set<String> seen = new HashSet<>();
683 for(SegmentCommitInfo info : readerMap.keySet()) {
684 assert !seen.contains(info.info.name);
685 seen.add(info.info.name);
686 }
687 return true;
688 }
689 }
690
691
692
693
694
695
696 public int numDeletedDocs(SegmentCommitInfo info) {
697 ensureOpen(false);
698 int delCount = info.getDelCount();
699
700 final ReadersAndUpdates rld = readerPool.get(info, false);
701 if (rld != null) {
702 delCount += rld.getPendingDeleteCount();
703 }
704 return delCount;
705 }
706
707
708
709
710
711
712
713
714
715
716
717
718 protected final void ensureOpen(boolean failIfClosing) throws AlreadyClosedException {
719 if (closed || (failIfClosing && closing)) {
720 throw new AlreadyClosedException("this IndexWriter is closed", tragedy);
721 }
722 }
723
724
725
726
727
728
729
730
731
732
733 protected final void ensureOpen() throws AlreadyClosedException {
734 ensureOpen(true);
735 }
736
737 final Codec codec;
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761 public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
762 conf.setIndexWriter(this);
763 config = conf;
764 infoStream = config.getInfoStream();
765
766
767
768 long timeout = config.getWriteLockTimeout();
769 final Directory lockDir;
770 if (timeout == 0) {
771
772 lockDir = d;
773 } else {
774 lockDir = new SleepingLockWrapper(d, timeout);
775 }
776 writeLock = lockDir.obtainLock(WRITE_LOCK_NAME);
777
778 boolean success = false;
779 try {
780 directoryOrig = d;
781 directory = new LockValidatingDirectoryWrapper(d, writeLock);
782
783
784
785 mergeDirectory = addMergeRateLimiters(directory);
786
787 analyzer = config.getAnalyzer();
788 mergeScheduler = config.getMergeScheduler();
789 mergeScheduler.setInfoStream(infoStream);
790 codec = config.getCodec();
791
792 bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
793 poolReaders = config.getReaderPooling();
794
795 OpenMode mode = config.getOpenMode();
796 boolean create;
797 if (mode == OpenMode.CREATE) {
798 create = true;
799 } else if (mode == OpenMode.APPEND) {
800 create = false;
801 } else {
802
803 create = !DirectoryReader.indexExists(directory);
804 }
805
806
807
808
809 boolean initialIndexExists = true;
810
811 String[] files = directory.listAll();
812
813
814 IndexCommit commit = config.getIndexCommit();
815
816
817 StandardDirectoryReader reader;
818 if (commit == null) {
819 reader = null;
820 } else {
821 reader = commit.getReader();
822 }
823
824 if (create) {
825
826 if (config.getIndexCommit() != null) {
827
828 if (mode == OpenMode.CREATE) {
829 throw new IllegalArgumentException("cannot use IndexWriterConfig.setIndexCommit() with OpenMode.CREATE");
830 } else {
831 throw new IllegalArgumentException("cannot use IndexWriterConfig.setIndexCommit() when index has no commit");
832 }
833 }
834
835
836
837
838
839 SegmentInfos sis = null;
840 try {
841 sis = SegmentInfos.readLatestCommit(directory);
842 sis.clear();
843 } catch (IOException e) {
844
845 initialIndexExists = false;
846 sis = new SegmentInfos();
847 }
848
849 segmentInfos = sis;
850
851 rollbackSegments = segmentInfos.createBackupSegmentInfos();
852
853
854
855 changed();
856
857 } else if (reader != null) {
858
859
860 if (reader.directory() != commit.getDirectory()) {
861 throw new IllegalArgumentException("IndexCommit's reader must have the same directory as the IndexCommit");
862 }
863
864 if (reader.directory() != directoryOrig) {
865 throw new IllegalArgumentException("IndexCommit's reader must have the same directory passed to IndexWriter");
866 }
867
868 if (reader.segmentInfos.getLastGeneration() == 0) {
869
870 throw new IllegalArgumentException("index must already have an initial commit to open from reader");
871 }
872
873
874 segmentInfos = reader.segmentInfos.clone();
875
876 SegmentInfos lastCommit;
877 try {
878 lastCommit = SegmentInfos.readCommit(directoryOrig, segmentInfos.getSegmentsFileName());
879 } catch (IOException ioe) {
880 throw new IllegalArgumentException("the provided reader is stale: its prior commit file \"" + segmentInfos.getSegmentsFileName() + "\" is missing from index");
881 }
882
883 if (reader.writer != null) {
884
885
886 assert reader.writer.closed;
887
888
889
890 segmentInfos.updateGenerationVersionAndCounter(reader.writer.segmentInfos);
891 lastCommit.updateGenerationVersionAndCounter(reader.writer.segmentInfos);
892 }
893
894 rollbackSegments = lastCommit.createBackupSegmentInfos();
895
896 if (infoStream.isEnabled("IW")) {
897 infoStream.message("IW", "init from reader " + reader);
898 messageState();
899 }
900 } else {
901
902
903 String lastSegmentsFile = SegmentInfos.getLastCommitSegmentsFileName(files);
904 if (lastSegmentsFile == null) {
905 throw new IndexNotFoundException("no segments* file found in " + directory + ": files: " + Arrays.toString(files));
906 }
907
908
909
910 segmentInfos = SegmentInfos.readCommit(directoryOrig, lastSegmentsFile);
911
912 if (commit != null) {
913
914
915
916
917
918 if (commit.getDirectory() != directoryOrig) {
919 throw new IllegalArgumentException("IndexCommit's directory doesn't match my directory, expected=" + directoryOrig + ", got=" + commit.getDirectory());
920 }
921
922 SegmentInfos oldInfos = SegmentInfos.readCommit(directoryOrig, commit.getSegmentsFileName());
923 segmentInfos.replace(oldInfos);
924 changed();
925
926 if (infoStream.isEnabled("IW")) {
927 infoStream.message("IW", "init: loaded commit \"" + commit.getSegmentsFileName() + "\"");
928 }
929 }
930
931 rollbackSegments = segmentInfos.createBackupSegmentInfos();
932 }
933
934 pendingNumDocs.set(segmentInfos.totalMaxDoc());
935
936
937
938 globalFieldNumberMap = getFieldNumberMap();
939
940 config.getFlushPolicy().init(config);
941 docWriter = new DocumentsWriter(this, config, directoryOrig, directory);
942 eventQueue = docWriter.eventQueue();
943
944
945
946
947
948 synchronized(this) {
949 deleter = new IndexFileDeleter(files, directoryOrig, directory,
950 config.getIndexDeletionPolicy(),
951 segmentInfos, infoStream, this,
952 initialIndexExists, reader != null);
953
954
955 assert create || filesExist(segmentInfos);
956 }
957
958 if (deleter.startingCommitDeleted) {
959
960
961
962
963 changed();
964 }
965
966 if (reader != null) {
967
968
969
970 List<LeafReaderContext> leaves = reader.leaves();
971 assert segmentInfos.size() == leaves.size();
972
973 for (int i=0;i<leaves.size();i++) {
974 LeafReaderContext leaf = leaves.get(i);
975 SegmentReader segReader = (SegmentReader) leaf.reader();
976 SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(), segReader.numDocs());
977 readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(this, newReader));
978 }
979
980
981 segmentInfos.changed();
982 changed();
983 }
984
985 if (infoStream.isEnabled("IW")) {
986 infoStream.message("IW", "init: create=" + create);
987 messageState();
988 }
989
990 success = true;
991
992 } finally {
993 if (!success) {
994 if (infoStream.isEnabled("IW")) {
995 infoStream.message("IW", "init: hit exception on init; releasing write lock");
996 }
997 IOUtils.closeWhileHandlingException(writeLock);
998 writeLock = null;
999 }
1000 }
1001 }
1002
1003
1004
1005
1006 static FieldInfos readFieldInfos(SegmentCommitInfo si) throws IOException {
1007 Codec codec = si.info.getCodec();
1008 FieldInfosFormat reader = codec.fieldInfosFormat();
1009
1010 if (si.hasFieldUpdates()) {
1011
1012 final String segmentSuffix = Long.toString(si.getFieldInfosGen(), Character.MAX_RADIX);
1013 return reader.read(si.info.dir, si.info, segmentSuffix, IOContext.READONCE);
1014 } else if (si.info.getUseCompoundFile()) {
1015
1016 try (Directory cfs = codec.compoundFormat().getCompoundReader(si.info.dir, si.info, IOContext.DEFAULT)) {
1017 return reader.read(cfs, si.info, "", IOContext.READONCE);
1018 }
1019 } else {
1020
1021 return reader.read(si.info.dir, si.info, "", IOContext.READONCE);
1022 }
1023 }
1024
1025
1026
1027
1028
1029 private FieldNumbers getFieldNumberMap() throws IOException {
1030 final FieldNumbers map = new FieldNumbers();
1031
1032 for(SegmentCommitInfo info : segmentInfos) {
1033 FieldInfos fis = readFieldInfos(info);
1034 for(FieldInfo fi : fis) {
1035 map.addOrGet(fi.name, fi.number, fi.getDocValuesType());
1036 }
1037 }
1038
1039 return map;
1040 }
1041
1042
1043
1044
1045
1046 public LiveIndexWriterConfig getConfig() {
1047 ensureOpen(false);
1048 return config;
1049 }
1050
1051 private void messageState() {
1052 if (infoStream.isEnabled("IW") && didMessageState == false) {
1053 didMessageState = true;
1054 infoStream.message("IW", "\ndir=" + directoryOrig + "\n" +
1055 "index=" + segString() + "\n" +
1056 "version=" + Version.LATEST.toString() + "\n" +
1057 config.toString());
1058 infoStream.message("IW", "MMapDirectory.UNMAP_SUPPORTED=" + MMapDirectory.UNMAP_SUPPORTED);
1059 }
1060 }
1061
1062
1063
1064
1065
1066
1067
1068 private void shutdown() throws IOException {
1069 if (pendingCommit != null) {
1070 throw new IllegalStateException("cannot close: prepareCommit was already called with no corresponding call to commit");
1071 }
1072
1073
1074 if (shouldClose(true)) {
1075 boolean success = false;
1076 try {
1077 if (infoStream.isEnabled("IW")) {
1078 infoStream.message("IW", "now flush at close");
1079 }
1080 flush(true, true);
1081 waitForMerges();
1082 commitInternal(config.getMergePolicy());
1083 rollbackInternal();
1084 success = true;
1085 } finally {
1086 if (success == false) {
1087
1088 try {
1089 rollbackInternal();
1090 } catch (Throwable t) {
1091
1092 }
1093 }
1094 }
1095 }
1096 }
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122 @Override
1123 public void close() throws IOException {
1124 if (config.getCommitOnClose()) {
1125 shutdown();
1126 } else {
1127 rollback();
1128 }
1129 }
1130
1131
1132
1133
1134 synchronized private boolean shouldClose(boolean waitForClose) {
1135 while (true) {
1136 if (closed == false) {
1137 if (closing == false) {
1138
1139 closing = true;
1140 return true;
1141 } else if (waitForClose == false) {
1142 return false;
1143 } else {
1144
1145
1146
1147 doWait();
1148 }
1149 } else {
1150 return false;
1151 }
1152 }
1153 }
1154
1155
1156 public Directory getDirectory() {
1157
1158 return directoryOrig;
1159 }
1160
1161
1162 public Analyzer getAnalyzer() {
1163 ensureOpen();
1164 return analyzer;
1165 }
1166
1167
1168
1169
1170
1171 public synchronized int maxDoc() {
1172 ensureOpen();
1173 return docWriter.getNumDocs() + segmentInfos.totalMaxDoc();
1174 }
1175
1176
1177
1178
1179
1180
1181
1182 public synchronized int numDocs() {
1183 ensureOpen();
1184 int count = docWriter.getNumDocs();
1185 for (final SegmentCommitInfo info : segmentInfos) {
1186 count += info.info.maxDoc() - numDeletedDocs(info);
1187 }
1188 return count;
1189 }
1190
1191
1192
1193
1194
1195
1196
1197
1198 public synchronized boolean hasDeletions() {
1199 ensureOpen();
1200 if (bufferedUpdatesStream.any()) {
1201 return true;
1202 }
1203 if (docWriter.anyDeletions()) {
1204 return true;
1205 }
1206 if (readerPool.anyPendingDeletes()) {
1207 return true;
1208 }
1209 for (final SegmentCommitInfo info : segmentInfos) {
1210 if (info.hasDeletions()) {
1211 return true;
1212 }
1213 }
1214 return false;
1215 }
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255 public void addDocument(Iterable<? extends IndexableField> doc) throws IOException {
1256 updateDocument(null, doc);
1257 }
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296 public void addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
1297 updateDocuments(null, docs);
1298 }
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313 public void updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
1314 ensureOpen();
1315 try {
1316 boolean success = false;
1317 try {
1318 if (docWriter.updateDocuments(docs, analyzer, delTerm)) {
1319 processEvents(true, false);
1320 }
1321 success = true;
1322 } finally {
1323 if (!success) {
1324 if (infoStream.isEnabled("IW")) {
1325 infoStream.message("IW", "hit exception updating document");
1326 }
1327 }
1328 }
1329 } catch (AbortingException | VirtualMachineError tragedy) {
1330 tragicEvent(tragedy, "updateDocuments");
1331 }
1332 }
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347 public synchronized boolean tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
1348
1349 final LeafReader reader;
1350 if (readerIn instanceof LeafReader) {
1351
1352 reader = (LeafReader) readerIn;
1353 } else {
1354
1355 List<LeafReaderContext> leaves = readerIn.leaves();
1356 int subIndex = ReaderUtil.subIndex(docID, leaves);
1357 reader = leaves.get(subIndex).reader();
1358 docID -= leaves.get(subIndex).docBase;
1359 assert docID >= 0;
1360 assert docID < reader.maxDoc();
1361 }
1362
1363 if (!(reader instanceof SegmentReader)) {
1364 throw new IllegalArgumentException("the reader must be a SegmentReader or composite reader containing only SegmentReaders");
1365 }
1366
1367 final SegmentCommitInfo info = ((SegmentReader) reader).getSegmentInfo();
1368
1369
1370
1371
1372
1373
1374 if (segmentInfos.indexOf(info) != -1) {
1375 ReadersAndUpdates rld = readerPool.get(info, false);
1376 if (rld != null) {
1377 synchronized(bufferedUpdatesStream) {
1378 rld.initWritableLiveDocs();
1379 if (rld.delete(docID)) {
1380 final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
1381 if (fullDelCount == rld.info.info.maxDoc()) {
1382
1383
1384
1385
1386 if (!mergingSegments.contains(rld.info)) {
1387 segmentInfos.remove(rld.info);
1388 readerPool.drop(rld.info);
1389 checkpoint();
1390 }
1391 }
1392
1393
1394
1395 changed();
1396 }
1397
1398 return true;
1399 }
1400 } else {
1401
1402 }
1403 } else {
1404
1405 }
1406 return false;
1407 }
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419 public void deleteDocuments(Term... terms) throws IOException {
1420 ensureOpen();
1421 try {
1422 if (docWriter.deleteTerms(terms)) {
1423 processEvents(true, false);
1424 }
1425 } catch (VirtualMachineError tragedy) {
1426 tragicEvent(tragedy, "deleteDocuments(Term..)");
1427 }
1428 }
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439 public void deleteDocuments(Query... queries) throws IOException {
1440 ensureOpen();
1441
1442
1443 for(Query query : queries) {
1444 if (query.getClass() == MatchAllDocsQuery.class) {
1445 deleteAll();
1446 return;
1447 }
1448 }
1449
1450 try {
1451 if (docWriter.deleteQueries(queries)) {
1452 processEvents(true, false);
1453 }
1454 } catch (VirtualMachineError tragedy) {
1455 tragicEvent(tragedy, "deleteDocuments(Query..)");
1456 }
1457 }
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472 public void updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
1473 ensureOpen();
1474 try {
1475 boolean success = false;
1476 try {
1477 if (docWriter.updateDocument(doc, analyzer, term)) {
1478 processEvents(true, false);
1479 }
1480 success = true;
1481 } finally {
1482 if (!success) {
1483 if (infoStream.isEnabled("IW")) {
1484 infoStream.message("IW", "hit exception updating document");
1485 }
1486 }
1487 }
1488 } catch (AbortingException | VirtualMachineError tragedy) {
1489 tragicEvent(tragedy, "updateDocument");
1490 }
1491 }
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509 public void updateNumericDocValue(Term term, String field, long value) throws IOException {
1510 ensureOpen();
1511 if (!globalFieldNumberMap.contains(field, DocValuesType.NUMERIC)) {
1512 throw new IllegalArgumentException("can only update existing numeric-docvalues fields!");
1513 }
1514 try {
1515 if (docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value))) {
1516 processEvents(true, false);
1517 }
1518 } catch (VirtualMachineError tragedy) {
1519 tragicEvent(tragedy, "updateNumericDocValue");
1520 }
1521 }
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543 public void updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
1544 ensureOpen();
1545 if (value == null) {
1546 throw new IllegalArgumentException("cannot update a field to a null value: " + field);
1547 }
1548 if (!globalFieldNumberMap.contains(field, DocValuesType.BINARY)) {
1549 throw new IllegalArgumentException("can only update existing binary-docvalues fields!");
1550 }
1551 try {
1552 if (docWriter.updateDocValues(new BinaryDocValuesUpdate(term, field, value))) {
1553 processEvents(true, false);
1554 }
1555 } catch (VirtualMachineError tragedy) {
1556 tragicEvent(tragedy, "updateBinaryDocValue");
1557 }
1558 }
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573 public void updateDocValues(Term term, Field... updates) throws IOException {
1574 ensureOpen();
1575 DocValuesUpdate[] dvUpdates = new DocValuesUpdate[updates.length];
1576 for (int i = 0; i < updates.length; i++) {
1577 final Field f = updates[i];
1578 final DocValuesType dvType = f.fieldType().docValuesType();
1579 if (dvType == null) {
1580 throw new NullPointerException("DocValuesType cannot be null (field: \"" + f.name() + "\")");
1581 }
1582 if (dvType == DocValuesType.NONE) {
1583 throw new IllegalArgumentException("can only update NUMERIC or BINARY fields! field=" + f.name());
1584 }
1585 if (!globalFieldNumberMap.contains(f.name(), dvType)) {
1586 throw new IllegalArgumentException("can only update existing docvalues fields! field=" + f.name() + ", type=" + dvType);
1587 }
1588 switch (dvType) {
1589 case NUMERIC:
1590 dvUpdates[i] = new NumericDocValuesUpdate(term, f.name(), (Long) f.numericValue());
1591 break;
1592 case BINARY:
1593 dvUpdates[i] = new BinaryDocValuesUpdate(term, f.name(), f.binaryValue());
1594 break;
1595 default:
1596 throw new IllegalArgumentException("can only update NUMERIC or BINARY fields: field=" + f.name() + ", type=" + dvType);
1597 }
1598 }
1599 try {
1600 if (docWriter.updateDocValues(dvUpdates)) {
1601 processEvents(true, false);
1602 }
1603 } catch (VirtualMachineError tragedy) {
1604 tragicEvent(tragedy, "updateDocValues");
1605 }
1606 }
1607
1608
1609 final synchronized int getSegmentCount(){
1610 return segmentInfos.size();
1611 }
1612
1613
1614 final synchronized int getNumBufferedDocuments(){
1615 return docWriter.getNumDocs();
1616 }
1617
1618
1619 final synchronized Collection<String> getIndexFileNames() throws IOException {
1620 return segmentInfos.files(true);
1621 }
1622
1623
1624 final synchronized int maxDoc(int i) {
1625 if (i >= 0 && i < segmentInfos.size()) {
1626 return segmentInfos.info(i).info.maxDoc();
1627 } else {
1628 return -1;
1629 }
1630 }
1631
1632
1633 final int getFlushCount() {
1634 return flushCount.get();
1635 }
1636
1637
1638 final int getFlushDeletesCount() {
1639 return flushDeletesCount.get();
1640 }
1641
1642 final String newSegmentName() {
1643
1644
1645 synchronized(segmentInfos) {
1646
1647
1648
1649
1650
1651 changeCount.incrementAndGet();
1652 segmentInfos.changed();
1653 return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX);
1654 }
1655 }
1656
1657
1658
1659 final InfoStream infoStream;
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720 public void forceMerge(int maxNumSegments) throws IOException {
1721 forceMerge(maxNumSegments, true);
1722 }
1723
1724
1725
1726
1727
1728
1729
1730 public void forceMerge(int maxNumSegments, boolean doWait) throws IOException {
1731 ensureOpen();
1732
1733 if (maxNumSegments < 1)
1734 throw new IllegalArgumentException("maxNumSegments must be >= 1; got " + maxNumSegments);
1735
1736 if (infoStream.isEnabled("IW")) {
1737 infoStream.message("IW", "forceMerge: index now " + segString());
1738 infoStream.message("IW", "now flush at forceMerge");
1739 }
1740
1741 flush(true, true);
1742
1743 synchronized(this) {
1744 resetMergeExceptions();
1745 segmentsToMerge.clear();
1746 for(SegmentCommitInfo info : segmentInfos) {
1747 segmentsToMerge.put(info, Boolean.TRUE);
1748 }
1749 mergeMaxNumSegments = maxNumSegments;
1750
1751
1752
1753 for(final MergePolicy.OneMerge merge : pendingMerges) {
1754 merge.maxNumSegments = maxNumSegments;
1755 segmentsToMerge.put(merge.info, Boolean.TRUE);
1756 }
1757
1758 for (final MergePolicy.OneMerge merge: runningMerges) {
1759 merge.maxNumSegments = maxNumSegments;
1760 segmentsToMerge.put(merge.info, Boolean.TRUE);
1761 }
1762 }
1763
1764 maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, maxNumSegments);
1765
1766 if (doWait) {
1767 synchronized(this) {
1768 while(true) {
1769
1770 if (tragedy != null) {
1771 throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMerge", tragedy);
1772 }
1773
1774 if (mergeExceptions.size() > 0) {
1775
1776
1777 final int size = mergeExceptions.size();
1778 for(int i=0;i<size;i++) {
1779 final MergePolicy.OneMerge merge = mergeExceptions.get(i);
1780 if (merge.maxNumSegments != -1) {
1781 throw new IOException("background merge hit exception: " + merge.segString(), merge.getException());
1782 }
1783 }
1784 }
1785
1786 if (maxNumSegmentsMergesPending())
1787 doWait();
1788 else
1789 break;
1790 }
1791 }
1792
1793
1794
1795
1796
1797 ensureOpen();
1798 }
1799
1800
1801
1802 }
1803
1804
1805
1806 private synchronized boolean maxNumSegmentsMergesPending() {
1807 for (final MergePolicy.OneMerge merge : pendingMerges) {
1808 if (merge.maxNumSegments != -1)
1809 return true;
1810 }
1811
1812 for (final MergePolicy.OneMerge merge : runningMerges) {
1813 if (merge.maxNumSegments != -1)
1814 return true;
1815 }
1816
1817 return false;
1818 }
1819
1820
1821
1822
1823
1824
1825 public void forceMergeDeletes(boolean doWait)
1826 throws IOException {
1827 ensureOpen();
1828
1829 flush(true, true);
1830
1831 if (infoStream.isEnabled("IW")) {
1832 infoStream.message("IW", "forceMergeDeletes: index now " + segString());
1833 }
1834
1835 final MergePolicy mergePolicy = config.getMergePolicy();
1836 MergePolicy.MergeSpecification spec;
1837 boolean newMergesFound = false;
1838 synchronized(this) {
1839 spec = mergePolicy.findForcedDeletesMerges(segmentInfos, this);
1840 newMergesFound = spec != null;
1841 if (newMergesFound) {
1842 final int numMerges = spec.merges.size();
1843 for(int i=0;i<numMerges;i++)
1844 registerMerge(spec.merges.get(i));
1845 }
1846 }
1847
1848 mergeScheduler.merge(this, MergeTrigger.EXPLICIT, newMergesFound);
1849
1850 if (spec != null && doWait) {
1851 final int numMerges = spec.merges.size();
1852 synchronized(this) {
1853 boolean running = true;
1854 while(running) {
1855
1856 if (tragedy != null) {
1857 throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMergeDeletes", tragedy);
1858 }
1859
1860
1861
1862
1863 running = false;
1864 for(int i=0;i<numMerges;i++) {
1865 final MergePolicy.OneMerge merge = spec.merges.get(i);
1866 if (pendingMerges.contains(merge) || runningMerges.contains(merge)) {
1867 running = true;
1868 }
1869 Throwable t = merge.getException();
1870 if (t != null) {
1871 throw new IOException("background merge hit exception: " + merge.segString(), t);
1872 }
1873 }
1874
1875
1876 if (running)
1877 doWait();
1878 }
1879 }
1880 }
1881
1882
1883
1884
1885 }
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907 public void forceMergeDeletes() throws IOException {
1908 forceMergeDeletes(true);
1909 }
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924 public final void maybeMerge() throws IOException {
1925 maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
1926 }
1927
1928 private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
1929 ensureOpen(false);
1930 boolean newMergesFound = updatePendingMerges(mergePolicy, trigger, maxNumSegments);
1931 mergeScheduler.merge(this, trigger, newMergesFound);
1932 }
1933
1934 private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
1935 throws IOException {
1936
1937
1938
1939 messageState();
1940
1941 assert maxNumSegments == -1 || maxNumSegments > 0;
1942 assert trigger != null;
1943 if (stopMerges) {
1944 return false;
1945 }
1946
1947
1948 if (tragedy != null) {
1949 return false;
1950 }
1951 boolean newMergesFound = false;
1952 final MergePolicy.MergeSpecification spec;
1953 if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) {
1954 assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED :
1955 "Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
1956 spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this);
1957 newMergesFound = spec != null;
1958 if (newMergesFound) {
1959 final int numMerges = spec.merges.size();
1960 for(int i=0;i<numMerges;i++) {
1961 final MergePolicy.OneMerge merge = spec.merges.get(i);
1962 merge.maxNumSegments = maxNumSegments;
1963 }
1964 }
1965 } else {
1966 spec = mergePolicy.findMerges(trigger, segmentInfos, this);
1967 }
1968 newMergesFound = spec != null;
1969 if (newMergesFound) {
1970 final int numMerges = spec.merges.size();
1971 for(int i=0;i<numMerges;i++) {
1972 registerMerge(spec.merges.get(i));
1973 }
1974 }
1975 return newMergesFound;
1976 }
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986 public synchronized Collection<SegmentCommitInfo> getMergingSegments() {
1987 return mergingSegments;
1988 }
1989
1990
1991
1992
1993
1994
1995
1996 public synchronized MergePolicy.OneMerge getNextMerge() {
1997 if (pendingMerges.size() == 0) {
1998 return null;
1999 } else {
2000
2001 MergePolicy.OneMerge merge = pendingMerges.removeFirst();
2002 runningMerges.add(merge);
2003 return merge;
2004 }
2005 }
2006
2007
2008
2009
2010
2011
2012 public synchronized boolean hasPendingMerges() {
2013 return pendingMerges.size() != 0;
2014 }
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027 @Override
2028 public void rollback() throws IOException {
2029
2030
2031
2032
2033 if (shouldClose(true)) {
2034 rollbackInternal();
2035 }
2036 }
2037
2038 private void rollbackInternal() throws IOException {
2039
2040 synchronized(commitLock) {
2041 rollbackInternalNoCommit();
2042 }
2043 }
2044
2045 private void rollbackInternalNoCommit() throws IOException {
2046 boolean success = false;
2047
2048 if (infoStream.isEnabled("IW")) {
2049 infoStream.message("IW", "rollback");
2050 }
2051
2052 try {
2053 abortMerges();
2054
2055 rateLimiters.close();
2056
2057 if (infoStream.isEnabled("IW")) {
2058 infoStream.message("IW", "rollback: done finish merges");
2059 }
2060
2061
2062
2063 mergeScheduler.close();
2064
2065 bufferedUpdatesStream.clear();
2066 docWriter.close();
2067 docWriter.abort(this);
2068 synchronized(this) {
2069
2070 if (pendingCommit != null) {
2071 pendingCommit.rollbackCommit(directory);
2072 try {
2073 deleter.decRef(pendingCommit);
2074 } finally {
2075 pendingCommit = null;
2076 notifyAll();
2077 }
2078 }
2079
2080
2081 readerPool.dropAll(false);
2082
2083
2084
2085
2086 segmentInfos.rollbackSegmentInfos(rollbackSegments);
2087
2088 if (infoStream.isEnabled("IW") ) {
2089 infoStream.message("IW", "rollback: infos=" + segString(segmentInfos));
2090 }
2091
2092 testPoint("rollback before checkpoint");
2093
2094
2095
2096
2097 if (tragedy == null) {
2098 deleter.checkpoint(segmentInfos, false);
2099 deleter.refresh();
2100 deleter.close();
2101 }
2102
2103 lastCommitChangeCount = changeCount.get();
2104
2105
2106
2107 closed = true;
2108
2109 IOUtils.close(writeLock);
2110 writeLock = null;
2111 }
2112
2113 success = true;
2114 } catch (VirtualMachineError tragedy) {
2115 tragicEvent(tragedy, "rollbackInternal");
2116 } finally {
2117 if (success == false) {
2118
2119
2120
2121 IOUtils.closeWhileHandlingException(mergeScheduler);
2122 }
2123 synchronized(this) {
2124 if (success == false) {
2125
2126
2127
2128 if (pendingCommit != null) {
2129 try {
2130 pendingCommit.rollbackCommit(directory);
2131 deleter.decRef(pendingCommit);
2132 } catch (Throwable t) {
2133 }
2134 pendingCommit = null;
2135 }
2136
2137
2138 IOUtils.closeWhileHandlingException(readerPool, deleter, writeLock);
2139 writeLock = null;
2140 }
2141 closed = true;
2142 closing = false;
2143
2144
2145 notifyAll();
2146 }
2147 }
2148 }
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177 public void deleteAll() throws IOException {
2178 ensureOpen();
2179
2180 boolean success = false;
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193 try {
2194 synchronized (fullFlushLock) {
2195 long abortedDocCount = docWriter.lockAndAbortAll(this);
2196 pendingNumDocs.addAndGet(-abortedDocCount);
2197
2198 processEvents(false, true);
2199 synchronized (this) {
2200 try {
2201
2202 abortMerges();
2203
2204 stopMerges = false;
2205
2206 pendingNumDocs.addAndGet(-segmentInfos.totalMaxDoc());
2207 segmentInfos.clear();
2208
2209 deleter.checkpoint(segmentInfos, false);
2210
2211
2212
2213
2214
2215
2216
2217 readerPool.dropAll(false);
2218
2219 changeCount.incrementAndGet();
2220 segmentInfos.changed();
2221 globalFieldNumberMap.clear();
2222
2223 success = true;
2224 } finally {
2225 docWriter.unlockAllAfterAbortAll(this);
2226 if (!success) {
2227 if (infoStream.isEnabled("IW")) {
2228 infoStream.message("IW", "hit exception during deleteAll");
2229 }
2230 }
2231 }
2232 }
2233 }
2234 } catch (VirtualMachineError tragedy) {
2235 tragicEvent(tragedy, "deleteAll");
2236 }
2237 }
2238
2239
2240
2241
2242 private synchronized void abortMerges() {
2243
2244 stopMerges = true;
2245
2246
2247 for (final MergePolicy.OneMerge merge : pendingMerges) {
2248 if (infoStream.isEnabled("IW")) {
2249 infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
2250 }
2251 merge.rateLimiter.setAbort();
2252 mergeFinish(merge);
2253 }
2254 pendingMerges.clear();
2255
2256 for (final MergePolicy.OneMerge merge : runningMerges) {
2257 if (infoStream.isEnabled("IW")) {
2258 infoStream.message("IW", "now abort running merge " + segString(merge.segments));
2259 }
2260 merge.rateLimiter.setAbort();
2261 }
2262
2263
2264
2265
2266 while (runningMerges.size() != 0) {
2267
2268 if (infoStream.isEnabled("IW")) {
2269 infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s to abort");
2270 }
2271
2272 doWait();
2273 }
2274
2275 notifyAll();
2276 assert 0 == mergingSegments.size();
2277
2278 if (infoStream.isEnabled("IW")) {
2279 infoStream.message("IW", "all running merges have aborted");
2280 }
2281 }
2282
2283
2284
2285
2286
2287
2288
2289 void waitForMerges() throws IOException {
2290
2291
2292
2293
2294 mergeScheduler.merge(this, MergeTrigger.CLOSING, false);
2295
2296 synchronized (this) {
2297 ensureOpen(false);
2298 if (infoStream.isEnabled("IW")) {
2299 infoStream.message("IW", "waitForMerges");
2300 }
2301
2302 while (pendingMerges.size() > 0 || runningMerges.size() > 0) {
2303 doWait();
2304 }
2305
2306
2307 assert 0 == mergingSegments.size();
2308
2309 if (infoStream.isEnabled("IW")) {
2310 infoStream.message("IW", "waitForMerges done");
2311 }
2312 }
2313 }
2314
2315
2316
2317
2318
2319
2320 synchronized void checkpoint() throws IOException {
2321 changed();
2322 deleter.checkpoint(segmentInfos, false);
2323 }
2324
2325
2326
2327
2328
2329 synchronized void checkpointNoSIS() throws IOException {
2330 changeCount.incrementAndGet();
2331 deleter.checkpoint(segmentInfos, false);
2332 }
2333
2334
2335 synchronized void changed() {
2336 changeCount.incrementAndGet();
2337 segmentInfos.changed();
2338 }
2339
2340 synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) {
2341 assert packet != null && packet.any();
2342 synchronized (bufferedUpdatesStream) {
2343 bufferedUpdatesStream.push(packet);
2344 }
2345 }
2346
2347
2348
2349
2350
2351 void publishFlushedSegment(SegmentCommitInfo newSegment,
2352 FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket) throws IOException {
2353 try {
2354 synchronized (this) {
2355
2356 ensureOpen(false);
2357 synchronized (bufferedUpdatesStream) {
2358 if (infoStream.isEnabled("IW")) {
2359 infoStream.message("IW", "publishFlushedSegment");
2360 }
2361
2362 if (globalPacket != null && globalPacket.any()) {
2363 bufferedUpdatesStream.push(globalPacket);
2364 }
2365
2366
2367 final long nextGen;
2368 if (packet != null && packet.any()) {
2369 nextGen = bufferedUpdatesStream.push(packet);
2370 } else {
2371
2372
2373 nextGen = bufferedUpdatesStream.getNextGen();
2374 }
2375 if (infoStream.isEnabled("IW")) {
2376 infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
2377 }
2378 newSegment.setBufferedDeletesGen(nextGen);
2379 segmentInfos.add(newSegment);
2380 checkpoint();
2381 }
2382 }
2383 } finally {
2384 flushCount.incrementAndGet();
2385 doAfterFlush();
2386 }
2387 }
2388
2389 private synchronized void resetMergeExceptions() {
2390 mergeExceptions = new ArrayList<>();
2391 mergeGen++;
2392 }
2393
2394 private void noDupDirs(Directory... dirs) {
2395 HashSet<Directory> dups = new HashSet<>();
2396 for(int i=0;i<dirs.length;i++) {
2397 if (dups.contains(dirs[i]))
2398 throw new IllegalArgumentException("Directory " + dirs[i] + " appears more than once");
2399 if (dirs[i] == directoryOrig)
2400 throw new IllegalArgumentException("Cannot add directory to itself");
2401 dups.add(dirs[i]);
2402 }
2403 }
2404
2405
2406
2407
2408 private List<Lock> acquireWriteLocks(Directory... dirs) throws IOException {
2409 List<Lock> locks = new ArrayList<>(dirs.length);
2410 for(int i=0;i<dirs.length;i++) {
2411 boolean success = false;
2412 try {
2413 Lock lock = dirs[i].obtainLock(WRITE_LOCK_NAME);
2414 locks.add(lock);
2415 success = true;
2416 } finally {
2417 if (success == false) {
2418
2419
2420 IOUtils.closeWhileHandlingException(locks);
2421 }
2422 }
2423 }
2424 return locks;
2425 }
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462 public void addIndexes(Directory... dirs) throws IOException {
2463 ensureOpen();
2464
2465 noDupDirs(dirs);
2466
2467 List<Lock> locks = acquireWriteLocks(dirs);
2468
2469 boolean successTop = false;
2470
2471 try {
2472 if (infoStream.isEnabled("IW")) {
2473 infoStream.message("IW", "flush at addIndexes(Directory...)");
2474 }
2475
2476 flush(false, true);
2477
2478 List<SegmentCommitInfo> infos = new ArrayList<>();
2479
2480
2481 long totalMaxDoc = 0;
2482 List<SegmentInfos> commits = new ArrayList<>(dirs.length);
2483 for (Directory dir : dirs) {
2484 if (infoStream.isEnabled("IW")) {
2485 infoStream.message("IW", "addIndexes: process directory " + dir);
2486 }
2487 SegmentInfos sis = SegmentInfos.readLatestCommit(dir);
2488 totalMaxDoc += sis.totalMaxDoc();
2489 commits.add(sis);
2490 }
2491
2492
2493 testReserveDocs(totalMaxDoc);
2494
2495 boolean success = false;
2496 try {
2497 for (SegmentInfos sis : commits) {
2498 for (SegmentCommitInfo info : sis) {
2499 assert !infos.contains(info): "dup info dir=" + info.info.dir + " name=" + info.info.name;
2500
2501 String newSegName = newSegmentName();
2502
2503 if (infoStream.isEnabled("IW")) {
2504 infoStream.message("IW", "addIndexes: process segment origName=" + info.info.name + " newName=" + newSegName + " info=" + info);
2505 }
2506
2507 IOContext context = new IOContext(new FlushInfo(info.info.maxDoc(), info.sizeInBytes()));
2508
2509 FieldInfos fis = readFieldInfos(info);
2510 for(FieldInfo fi : fis) {
2511 globalFieldNumberMap.addOrGet(fi.name, fi.number, fi.getDocValuesType());
2512 }
2513 infos.add(copySegmentAsIs(info, newSegName, context));
2514 }
2515 }
2516 success = true;
2517 } finally {
2518 if (!success) {
2519 for(SegmentCommitInfo sipc : infos) {
2520
2521 deleteNewFiles(sipc.files());
2522 }
2523 }
2524 }
2525
2526 synchronized (this) {
2527 success = false;
2528 try {
2529 ensureOpen();
2530
2531
2532 reserveDocs(totalMaxDoc);
2533
2534 success = true;
2535 } finally {
2536 if (!success) {
2537 for(SegmentCommitInfo sipc : infos) {
2538
2539 deleteNewFiles(sipc.files());
2540 }
2541 }
2542 }
2543 segmentInfos.addAll(infos);
2544 checkpoint();
2545 }
2546
2547 successTop = true;
2548
2549 } catch (VirtualMachineError tragedy) {
2550 tragicEvent(tragedy, "addIndexes(Directory...)");
2551 } finally {
2552 if (successTop) {
2553 IOUtils.close(locks);
2554 } else {
2555 IOUtils.closeWhileHandlingException(locks);
2556 }
2557 }
2558 maybeMerge();
2559 }
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590 public void addIndexes(CodecReader... readers) throws IOException {
2591 ensureOpen();
2592
2593
2594 long numDocs = 0;
2595
2596 try {
2597 if (infoStream.isEnabled("IW")) {
2598 infoStream.message("IW", "flush at addIndexes(CodecReader...)");
2599 }
2600 flush(false, true);
2601
2602 String mergedName = newSegmentName();
2603 for (CodecReader leaf : readers) {
2604 numDocs += leaf.numDocs();
2605 }
2606
2607
2608 testReserveDocs(numDocs);
2609
2610 final IOContext context = new IOContext(new MergeInfo((int) numDocs, -1, false, -1));
2611
2612
2613
2614 TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory);
2615
2616 SegmentInfo info = new SegmentInfo(directoryOrig, Version.LATEST, mergedName, -1,
2617 false, codec, Collections.<String,String>emptyMap(), StringHelper.randomId(), new HashMap<String,String>());
2618
2619 SegmentMerger merger = new SegmentMerger(Arrays.asList(readers), info, infoStream, trackingDir,
2620 globalFieldNumberMap,
2621 context);
2622
2623 rateLimiters.set(new MergeRateLimiter(null));
2624
2625 if (!merger.shouldMerge()) {
2626 return;
2627 }
2628
2629 merger.merge();
2630
2631 SegmentCommitInfo infoPerCommit = new SegmentCommitInfo(info, 0, -1L, -1L, -1L);
2632
2633 info.setFiles(new HashSet<>(trackingDir.getCreatedFiles()));
2634 trackingDir.getCreatedFiles().clear();
2635
2636 setDiagnostics(info, SOURCE_ADDINDEXES_READERS);
2637
2638 final MergePolicy mergePolicy = config.getMergePolicy();
2639 boolean useCompoundFile;
2640 synchronized(this) {
2641 if (stopMerges) {
2642
2643 deleteNewFiles(infoPerCommit.files());
2644 return;
2645 }
2646 ensureOpen();
2647 useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
2648 }
2649
2650
2651 if (useCompoundFile) {
2652 Collection<String> filesToDelete = infoPerCommit.files();
2653 TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory);
2654
2655
2656 try {
2657 createCompoundFile(infoStream, trackingCFSDir, info, context);
2658 } finally {
2659
2660
2661 deleteNewFiles(filesToDelete);
2662 }
2663 info.setUseCompoundFile(true);
2664 }
2665
2666
2667
2668
2669
2670 codec.segmentInfoFormat().write(trackingDir, info, context);
2671
2672 info.addFiles(trackingDir.getCreatedFiles());
2673
2674
2675 synchronized(this) {
2676 if (stopMerges) {
2677
2678 deleteNewFiles(infoPerCommit.files());
2679 return;
2680 }
2681 ensureOpen();
2682
2683
2684 reserveDocs(numDocs);
2685
2686 segmentInfos.add(infoPerCommit);
2687 checkpoint();
2688 }
2689 } catch (VirtualMachineError tragedy) {
2690 tragicEvent(tragedy, "addIndexes(CodecReader...)");
2691 }
2692 maybeMerge();
2693 }
2694
2695
2696 private SegmentCommitInfo copySegmentAsIs(SegmentCommitInfo info, String segName, IOContext context) throws IOException {
2697
2698
2699
2700 SegmentInfo newInfo = new SegmentInfo(directoryOrig, info.info.getVersion(), segName, info.info.maxDoc(),
2701 info.info.getUseCompoundFile(), info.info.getCodec(),
2702 info.info.getDiagnostics(), info.info.getId(), info.info.getAttributes());
2703 SegmentCommitInfo newInfoPerCommit = new SegmentCommitInfo(newInfo, info.getDelCount(), info.getDelGen(),
2704 info.getFieldInfosGen(), info.getDocValuesGen());
2705
2706 newInfo.setFiles(info.files());
2707
2708 boolean success = false;
2709
2710 Set<String> copiedFiles = new HashSet<>();
2711 try {
2712
2713 for (String file: info.files()) {
2714 final String newFileName = newInfo.namedForThisSegment(file);
2715
2716 assert !slowFileExists(directory, newFileName): "file \"" + newFileName + "\" already exists; newInfo.files=" + newInfo.files();
2717
2718 directory.copyFrom(info.info.dir, file, newFileName, context);
2719 copiedFiles.add(newFileName);
2720 }
2721 success = true;
2722 } finally {
2723 if (!success) {
2724
2725 deleteNewFiles(copiedFiles);
2726 }
2727 }
2728
2729 assert copiedFiles.equals(newInfoPerCommit.files());
2730
2731 return newInfoPerCommit;
2732 }
2733
2734
2735
2736
2737
2738
2739 protected void doAfterFlush() throws IOException {}
2740
2741
2742
2743
2744
2745 protected void doBeforeFlush() throws IOException {}
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761 @Override
2762 public final void prepareCommit() throws IOException {
2763 ensureOpen();
2764 prepareCommitInternal(config.getMergePolicy());
2765 }
2766
2767 private void prepareCommitInternal(MergePolicy mergePolicy) throws IOException {
2768 startCommitTime = System.nanoTime();
2769 synchronized(commitLock) {
2770 ensureOpen(false);
2771 if (infoStream.isEnabled("IW")) {
2772 infoStream.message("IW", "prepareCommit: flush");
2773 infoStream.message("IW", " index before flush " + segString());
2774 }
2775
2776 if (tragedy != null) {
2777 throw new IllegalStateException("this writer hit an unrecoverable error; cannot commit", tragedy);
2778 }
2779
2780 if (pendingCommit != null) {
2781 throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
2782 }
2783
2784 doBeforeFlush();
2785 testPoint("startDoFlush");
2786 SegmentInfos toCommit = null;
2787 boolean anySegmentsFlushed = false;
2788
2789
2790
2791
2792
2793 try {
2794
2795 synchronized (fullFlushLock) {
2796 boolean flushSuccess = false;
2797 boolean success = false;
2798 try {
2799 anySegmentsFlushed = docWriter.flushAllThreads();
2800 if (!anySegmentsFlushed) {
2801
2802
2803 flushCount.incrementAndGet();
2804 }
2805 processEvents(false, true);
2806 flushSuccess = true;
2807
2808 synchronized(this) {
2809 maybeApplyDeletes(true);
2810
2811 readerPool.commit(segmentInfos);
2812
2813 if (changeCount.get() != lastCommitChangeCount) {
2814
2815
2816
2817
2818 changeCount.incrementAndGet();
2819 segmentInfos.changed();
2820 }
2821
2822
2823
2824
2825
2826
2827 toCommit = segmentInfos.clone();
2828
2829 pendingCommitChangeCount = changeCount.get();
2830
2831
2832
2833
2834
2835
2836 filesToCommit = toCommit.files(false);
2837 deleter.incRef(filesToCommit);
2838 }
2839 success = true;
2840 } finally {
2841 if (!success) {
2842 if (infoStream.isEnabled("IW")) {
2843 infoStream.message("IW", "hit exception during prepareCommit");
2844 }
2845 }
2846
2847 docWriter.finishFullFlush(this, flushSuccess);
2848 doAfterFlush();
2849 }
2850 }
2851 } catch (AbortingException | VirtualMachineError tragedy) {
2852 tragicEvent(tragedy, "prepareCommit");
2853 }
2854
2855 boolean success = false;
2856 try {
2857 if (anySegmentsFlushed) {
2858 maybeMerge(mergePolicy, MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
2859 }
2860 startCommit(toCommit);
2861 success = true;
2862 } finally {
2863 if (!success) {
2864 synchronized (this) {
2865 if (filesToCommit != null) {
2866 deleter.decRefWhileHandlingException(filesToCommit);
2867 filesToCommit = null;
2868 }
2869 }
2870 }
2871 }
2872 }
2873 }
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885 public final synchronized void setCommitData(Map<String,String> commitUserData) {
2886 segmentInfos.setUserData(new HashMap<>(commitUserData));
2887 changeCount.incrementAndGet();
2888 }
2889
2890
2891
2892
2893
2894 public final synchronized Map<String,String> getCommitData() {
2895 return segmentInfos.getUserData();
2896 }
2897
2898
2899
2900 private final Object commitLock = new Object();
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927 @Override
2928 public final void commit() throws IOException {
2929 ensureOpen();
2930 commitInternal(config.getMergePolicy());
2931 }
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942 public final boolean hasUncommittedChanges() {
2943 return changeCount.get() != lastCommitChangeCount || docWriter.anyChanges() || bufferedUpdatesStream.any();
2944 }
2945
2946 private final void commitInternal(MergePolicy mergePolicy) throws IOException {
2947
2948 if (infoStream.isEnabled("IW")) {
2949 infoStream.message("IW", "commit: start");
2950 }
2951
2952 synchronized(commitLock) {
2953 ensureOpen(false);
2954
2955 if (infoStream.isEnabled("IW")) {
2956 infoStream.message("IW", "commit: enter lock");
2957 }
2958
2959 if (pendingCommit == null) {
2960 if (infoStream.isEnabled("IW")) {
2961 infoStream.message("IW", "commit: now prepare");
2962 }
2963 prepareCommitInternal(mergePolicy);
2964 } else {
2965 if (infoStream.isEnabled("IW")) {
2966 infoStream.message("IW", "commit: already prepared");
2967 }
2968 }
2969
2970 finishCommit();
2971 }
2972 }
2973
2974 private final void finishCommit() throws IOException {
2975
2976 boolean commitCompleted = false;
2977 boolean finished = false;
2978 String committedSegmentsFileName = null;
2979
2980 try {
2981 synchronized(this) {
2982 ensureOpen(false);
2983
2984 if (tragedy != null) {
2985 throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete commit", tragedy);
2986 }
2987
2988 if (pendingCommit != null) {
2989 try {
2990
2991 if (infoStream.isEnabled("IW")) {
2992 infoStream.message("IW", "commit: pendingCommit != null");
2993 }
2994
2995 committedSegmentsFileName = pendingCommit.finishCommit(directory);
2996
2997
2998 commitCompleted = true;
2999
3000 if (infoStream.isEnabled("IW")) {
3001 infoStream.message("IW", "commit: done writing segments file \"" + committedSegmentsFileName + "\"");
3002 }
3003
3004
3005
3006 deleter.checkpoint(pendingCommit, true);
3007
3008
3009 segmentInfos.updateGeneration(pendingCommit);
3010
3011 lastCommitChangeCount = pendingCommitChangeCount;
3012 rollbackSegments = pendingCommit.createBackupSegmentInfos();
3013
3014 finished = true;
3015 } finally {
3016 notifyAll();
3017 try {
3018 if (finished) {
3019
3020 deleter.decRef(filesToCommit);
3021 } else if (commitCompleted == false) {
3022
3023 deleter.decRefWhileHandlingException(filesToCommit);
3024 }
3025 } finally {
3026 pendingCommit = null;
3027 filesToCommit = null;
3028 }
3029 }
3030 } else {
3031 assert filesToCommit == null;
3032 if (infoStream.isEnabled("IW")) {
3033 infoStream.message("IW", "commit: pendingCommit == null; skip");
3034 }
3035 }
3036 }
3037 } catch (Throwable t) {
3038 if (infoStream.isEnabled("IW")) {
3039 infoStream.message("IW", "hit exception during finishCommit: " + t.getMessage());
3040 }
3041 if (commitCompleted) {
3042 tragicEvent(t, "finishCommit");
3043 } else {
3044 IOUtils.reThrow(t);
3045 }
3046 }
3047
3048 if (infoStream.isEnabled("IW")) {
3049 infoStream.message("IW", String.format(Locale.ROOT, "commit: took %.1f msec", (System.nanoTime()-startCommitTime)/1000000.0));
3050 infoStream.message("IW", "commit: done");
3051 }
3052 }
3053
3054
3055
3056 private final Object fullFlushLock = new Object();
3057
3058
3059 boolean holdsFullFlushLock() {
3060 return Thread.holdsLock(fullFlushLock);
3061 }
3062
3063
3064
3065 public final void flush() throws IOException {
3066 flush(true, true);
3067 }
3068
3069
3070
3071
3072
3073
3074
3075
3076 final void flush(boolean triggerMerge, boolean applyAllDeletes) throws IOException {
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086 ensureOpen(false);
3087 if (doFlush(applyAllDeletes) && triggerMerge) {
3088 maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
3089 }
3090 }
3091
3092
3093 private boolean doFlush(boolean applyAllDeletes) throws IOException {
3094 if (tragedy != null) {
3095 throw new IllegalStateException("this writer hit an unrecoverable error; cannot flush", tragedy);
3096 }
3097
3098 doBeforeFlush();
3099 testPoint("startDoFlush");
3100 boolean success = false;
3101 try {
3102
3103 if (infoStream.isEnabled("IW")) {
3104 infoStream.message("IW", " start flush: applyAllDeletes=" + applyAllDeletes);
3105 infoStream.message("IW", " index before flush " + segString());
3106 }
3107 boolean anyChanges = false;
3108
3109 synchronized (fullFlushLock) {
3110 boolean flushSuccess = false;
3111 try {
3112 anyChanges = docWriter.flushAllThreads();
3113 if (!anyChanges) {
3114
3115 flushCount.incrementAndGet();
3116 }
3117 flushSuccess = true;
3118 } finally {
3119 docWriter.finishFullFlush(this, flushSuccess);
3120 processEvents(false, true);
3121 }
3122 }
3123 synchronized(this) {
3124 anyChanges |= maybeApplyDeletes(applyAllDeletes);
3125 doAfterFlush();
3126 success = true;
3127 return anyChanges;
3128 }
3129 } catch (AbortingException | VirtualMachineError tragedy) {
3130 tragicEvent(tragedy, "doFlush");
3131
3132 return false;
3133 } finally {
3134 if (!success) {
3135 if (infoStream.isEnabled("IW")) {
3136 infoStream.message("IW", "hit exception during flush");
3137 }
3138 }
3139 }
3140 }
3141
3142 final synchronized boolean maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
3143 if (applyAllDeletes) {
3144 if (infoStream.isEnabled("IW")) {
3145 infoStream.message("IW", "apply all deletes during flush");
3146 }
3147 return applyAllDeletesAndUpdates();
3148 } else if (infoStream.isEnabled("IW")) {
3149 infoStream.message("IW", "don't apply deletes now delTermCount=" + bufferedUpdatesStream.numTerms() + " bytesUsed=" + bufferedUpdatesStream.ramBytesUsed());
3150 }
3151
3152 return false;
3153 }
3154
3155 final synchronized boolean applyAllDeletesAndUpdates() throws IOException {
3156 flushDeletesCount.incrementAndGet();
3157 final BufferedUpdatesStream.ApplyDeletesResult result;
3158 if (infoStream.isEnabled("IW")) {
3159 infoStream.message("IW", "now apply all deletes for all segments maxDoc=" + (docWriter.getNumDocs() + segmentInfos.totalMaxDoc()));
3160 }
3161 result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList());
3162 if (result.anyDeletes) {
3163 checkpoint();
3164 }
3165 if (!keepFullyDeletedSegments && result.allDeleted != null) {
3166 if (infoStream.isEnabled("IW")) {
3167 infoStream.message("IW", "drop 100% deleted segments: " + segString(result.allDeleted));
3168 }
3169 for (SegmentCommitInfo info : result.allDeleted) {
3170
3171
3172
3173
3174 if (!mergingSegments.contains(info)) {
3175 segmentInfos.remove(info);
3176 pendingNumDocs.addAndGet(-info.info.maxDoc());
3177 readerPool.drop(info);
3178 }
3179 }
3180 checkpoint();
3181 }
3182 bufferedUpdatesStream.prune(segmentInfos);
3183 return result.anyDeletes;
3184 }
3185
3186
3187 DocumentsWriter getDocsWriter() {
3188 return docWriter;
3189 }
3190
3191
3192
3193 public final synchronized int numRamDocs() {
3194 ensureOpen();
3195 return docWriter.getNumDocs();
3196 }
3197
3198 private synchronized void ensureValidMerge(MergePolicy.OneMerge merge) {
3199 for(SegmentCommitInfo info : merge.segments) {
3200 if (!segmentInfos.contains(info)) {
3201 throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.info.name + ") that is not in the current index " + segString(), directoryOrig);
3202 }
3203 }
3204 }
3205
3206 private void skipDeletedDoc(DocValuesFieldUpdates.Iterator[] updatesIters, int deletedDoc) {
3207 for (DocValuesFieldUpdates.Iterator iter : updatesIters) {
3208 if (iter.doc() == deletedDoc) {
3209 iter.nextDoc();
3210 }
3211
3212
3213
3214 assert iter.doc() > deletedDoc : "updateDoc=" + iter.doc() + " deletedDoc=" + deletedDoc;
3215 }
3216 }
3217
3218 private static class MergedDeletesAndUpdates {
3219 ReadersAndUpdates mergedDeletesAndUpdates = null;
3220 MergePolicy.DocMap docMap = null;
3221 boolean initializedWritableLiveDocs = false;
3222
3223 MergedDeletesAndUpdates() {}
3224
3225 final void init(ReaderPool readerPool, MergePolicy.OneMerge merge, MergeState mergeState, boolean initWritableLiveDocs) throws IOException {
3226 if (mergedDeletesAndUpdates == null) {
3227 mergedDeletesAndUpdates = readerPool.get(merge.info, true);
3228 docMap = merge.getDocMap(mergeState);
3229 assert docMap.isConsistent(merge.info.info.maxDoc());
3230 }
3231 if (initWritableLiveDocs && !initializedWritableLiveDocs) {
3232 mergedDeletesAndUpdates.initWritableLiveDocs();
3233 this.initializedWritableLiveDocs = true;
3234 }
3235 }
3236
3237 }
3238
3239 private void maybeApplyMergedDVUpdates(MergePolicy.OneMerge merge, MergeState mergeState, int docUpto,
3240 MergedDeletesAndUpdates holder, String[] mergingFields, DocValuesFieldUpdates[] dvFieldUpdates,
3241 DocValuesFieldUpdates.Iterator[] updatesIters, int curDoc) throws IOException {
3242 int newDoc = -1;
3243 for (int idx = 0; idx < mergingFields.length; idx++) {
3244 DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx];
3245 if (updatesIter.doc() == curDoc) {
3246 if (holder.mergedDeletesAndUpdates == null) {
3247 holder.init(readerPool, merge, mergeState, false);
3248 }
3249 if (newDoc == -1) {
3250 newDoc = holder.docMap.map(docUpto);
3251 }
3252 DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx];
3253 dvUpdates.add(newDoc, updatesIter.value());
3254 updatesIter.nextDoc();
3255 } else {
3256 assert updatesIter.doc() > curDoc : "field=" + mergingFields[idx] + " updateDoc=" + updatesIter.doc() + " curDoc=" + curDoc;
3257 }
3258 }
3259 }
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271 synchronized private ReadersAndUpdates commitMergedDeletesAndUpdates(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
3272
3273 testPoint("startCommitMergeDeletes");
3274
3275 final List<SegmentCommitInfo> sourceSegments = merge.segments;
3276
3277 if (infoStream.isEnabled("IW")) {
3278 infoStream.message("IW", "commitMergeDeletes " + segString(merge.segments));
3279 }
3280
3281
3282
3283 int docUpto = 0;
3284 long minGen = Long.MAX_VALUE;
3285
3286
3287 final MergedDeletesAndUpdates holder = new MergedDeletesAndUpdates();
3288 final DocValuesFieldUpdates.Container mergedDVUpdates = new DocValuesFieldUpdates.Container();
3289
3290 for (int i = 0; i < sourceSegments.size(); i++) {
3291 SegmentCommitInfo info = sourceSegments.get(i);
3292 minGen = Math.min(info.getBufferedDeletesGen(), minGen);
3293 final int maxDoc = info.info.maxDoc();
3294 final Bits prevLiveDocs = merge.readers.get(i).getLiveDocs();
3295 final ReadersAndUpdates rld = readerPool.get(info, false);
3296
3297 assert rld != null: "seg=" + info.info.name;
3298 final Bits currentLiveDocs = rld.getLiveDocs();
3299 final Map<String,DocValuesFieldUpdates> mergingFieldUpdates = rld.getMergingFieldUpdates();
3300 final String[] mergingFields;
3301 final DocValuesFieldUpdates[] dvFieldUpdates;
3302 final DocValuesFieldUpdates.Iterator[] updatesIters;
3303 if (mergingFieldUpdates.isEmpty()) {
3304 mergingFields = null;
3305 updatesIters = null;
3306 dvFieldUpdates = null;
3307 } else {
3308 mergingFields = new String[mergingFieldUpdates.size()];
3309 dvFieldUpdates = new DocValuesFieldUpdates[mergingFieldUpdates.size()];
3310 updatesIters = new DocValuesFieldUpdates.Iterator[mergingFieldUpdates.size()];
3311 int idx = 0;
3312 for (Entry<String,DocValuesFieldUpdates> e : mergingFieldUpdates.entrySet()) {
3313 String field = e.getKey();
3314 DocValuesFieldUpdates updates = e.getValue();
3315 mergingFields[idx] = field;
3316 dvFieldUpdates[idx] = mergedDVUpdates.getUpdates(field, updates.type);
3317 if (dvFieldUpdates[idx] == null) {
3318 dvFieldUpdates[idx] = mergedDVUpdates.newUpdates(field, updates.type, mergeState.segmentInfo.maxDoc());
3319 }
3320 updatesIters[idx] = updates.iterator();
3321 updatesIters[idx].nextDoc();
3322 ++idx;
3323 }
3324 }
3325
3326
3327 if (prevLiveDocs != null) {
3328
3329
3330
3331 assert currentLiveDocs != null;
3332 assert prevLiveDocs.length() == maxDoc;
3333 assert currentLiveDocs.length() == maxDoc;
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347 if (currentLiveDocs != prevLiveDocs) {
3348
3349
3350
3351 for (int j = 0; j < maxDoc; j++) {
3352 if (!prevLiveDocs.get(j)) {
3353 assert !currentLiveDocs.get(j);
3354 } else {
3355 if (!currentLiveDocs.get(j)) {
3356 if (holder.mergedDeletesAndUpdates == null || !holder.initializedWritableLiveDocs) {
3357 holder.init(readerPool, merge, mergeState, true);
3358 }
3359 holder.mergedDeletesAndUpdates.delete(holder.docMap.map(docUpto));
3360 if (mergingFields != null) {
3361 skipDeletedDoc(updatesIters, j);
3362 }
3363 } else if (mergingFields != null) {
3364 maybeApplyMergedDVUpdates(merge, mergeState, docUpto, holder, mergingFields, dvFieldUpdates, updatesIters, j);
3365 }
3366 docUpto++;
3367 }
3368 }
3369 } else if (mergingFields != null) {
3370
3371 for (int j = 0; j < maxDoc; j++) {
3372 if (prevLiveDocs.get(j)) {
3373
3374 maybeApplyMergedDVUpdates(merge, mergeState, docUpto, holder, mergingFields, dvFieldUpdates, updatesIters, j);
3375
3376 docUpto++;
3377 } else {
3378
3379 skipDeletedDoc(updatesIters, j);
3380 }
3381 }
3382 } else {
3383 docUpto += info.info.maxDoc() - info.getDelCount() - rld.getPendingDeleteCount();
3384 }
3385 } else if (currentLiveDocs != null) {
3386 assert currentLiveDocs.length() == maxDoc;
3387
3388
3389 for (int j = 0; j < maxDoc; j++) {
3390 if (!currentLiveDocs.get(j)) {
3391 if (holder.mergedDeletesAndUpdates == null || !holder.initializedWritableLiveDocs) {
3392 holder.init(readerPool, merge, mergeState, true);
3393 }
3394 holder.mergedDeletesAndUpdates.delete(holder.docMap.map(docUpto));
3395 if (mergingFields != null) {
3396 skipDeletedDoc(updatesIters, j);
3397 }
3398 } else if (mergingFields != null) {
3399 maybeApplyMergedDVUpdates(merge, mergeState, docUpto, holder, mergingFields, dvFieldUpdates, updatesIters, j);
3400 }
3401 docUpto++;
3402 }
3403 } else if (mergingFields != null) {
3404
3405 for (int j = 0; j < maxDoc; j++) {
3406 maybeApplyMergedDVUpdates(merge, mergeState, docUpto, holder, mergingFields, dvFieldUpdates, updatesIters, j);
3407
3408 docUpto++;
3409 }
3410 } else {
3411
3412 docUpto += info.info.maxDoc();
3413 }
3414 }
3415
3416 assert docUpto == merge.info.info.maxDoc();
3417
3418 if (mergedDVUpdates.any()) {
3419
3420 boolean success = false;
3421 try {
3422
3423
3424
3425
3426
3427
3428 holder.mergedDeletesAndUpdates.writeFieldUpdates(directory, mergedDVUpdates);
3429 success = true;
3430 } finally {
3431 if (!success) {
3432 holder.mergedDeletesAndUpdates.dropChanges();
3433 readerPool.drop(merge.info);
3434 }
3435 }
3436 }
3437
3438 if (infoStream.isEnabled("IW")) {
3439 if (holder.mergedDeletesAndUpdates == null) {
3440 infoStream.message("IW", "no new deletes or field updates since merge started");
3441 } else {
3442 String msg = holder.mergedDeletesAndUpdates.getPendingDeleteCount() + " new deletes";
3443 if (mergedDVUpdates.any()) {
3444 msg += " and " + mergedDVUpdates.size() + " new field updates";
3445 }
3446 msg += " since merge started";
3447 infoStream.message("IW", msg);
3448 }
3449 }
3450
3451 merge.info.setBufferedDeletesGen(minGen);
3452
3453 return holder.mergedDeletesAndUpdates;
3454 }
3455
3456 synchronized private boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
3457
3458 testPoint("startCommitMerge");
3459
3460 if (tragedy != null) {
3461 throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete merge", tragedy);
3462 }
3463
3464 if (infoStream.isEnabled("IW")) {
3465 infoStream.message("IW", "commitMerge: " + segString(merge.segments) + " index=" + segString());
3466 }
3467
3468 assert merge.registerDone;
3469
3470
3471
3472
3473
3474
3475
3476 if (merge.rateLimiter.getAbort()) {
3477 if (infoStream.isEnabled("IW")) {
3478 infoStream.message("IW", "commitMerge: skip: it was aborted");
3479 }
3480
3481
3482
3483
3484
3485
3486
3487
3488
3489 readerPool.drop(merge.info);
3490
3491
3492 deleteNewFiles(merge.info.files());
3493 return false;
3494 }
3495
3496 final ReadersAndUpdates mergedUpdates = merge.info.info.maxDoc() == 0 ? null : commitMergedDeletesAndUpdates(merge, mergeState);
3497
3498
3499
3500
3501
3502
3503
3504 assert !segmentInfos.contains(merge.info);
3505
3506 final boolean allDeleted = merge.segments.size() == 0 ||
3507 merge.info.info.maxDoc() == 0 ||
3508 (mergedUpdates != null &&
3509 mergedUpdates.getPendingDeleteCount() == merge.info.info.maxDoc());
3510
3511 if (infoStream.isEnabled("IW")) {
3512 if (allDeleted) {
3513 infoStream.message("IW", "merged segment " + merge.info + " is 100% deleted" + (keepFullyDeletedSegments ? "" : "; skipping insert"));
3514 }
3515 }
3516
3517 final boolean dropSegment = allDeleted && !keepFullyDeletedSegments;
3518
3519
3520
3521 assert merge.segments.size() > 0 || dropSegment;
3522
3523 assert merge.info.info.maxDoc() != 0 || keepFullyDeletedSegments || dropSegment;
3524
3525 if (mergedUpdates != null) {
3526 boolean success = false;
3527 try {
3528 if (dropSegment) {
3529 mergedUpdates.dropChanges();
3530 }
3531
3532
3533
3534 readerPool.release(mergedUpdates, false);
3535 success = true;
3536 } finally {
3537 if (!success) {
3538 mergedUpdates.dropChanges();
3539 readerPool.drop(merge.info);
3540 }
3541 }
3542 }
3543
3544
3545
3546
3547
3548 segmentInfos.applyMergeChanges(merge, dropSegment);
3549
3550
3551
3552 int delDocCount = merge.totalMaxDoc - merge.info.info.maxDoc();
3553 assert delDocCount >= 0;
3554 pendingNumDocs.addAndGet(-delDocCount);
3555
3556 if (dropSegment) {
3557 assert !segmentInfos.contains(merge.info);
3558 readerPool.drop(merge.info);
3559
3560 deleteNewFiles(merge.info.files());
3561 }
3562
3563 boolean success = false;
3564 try {
3565
3566
3567
3568 closeMergeReaders(merge, false);
3569 success = true;
3570 } finally {
3571
3572
3573
3574 if (success) {
3575 checkpoint();
3576 } else {
3577 try {
3578 checkpoint();
3579 } catch (Throwable t) {
3580
3581 }
3582 }
3583 }
3584
3585 deleter.deletePendingFiles();
3586
3587 if (infoStream.isEnabled("IW")) {
3588 infoStream.message("IW", "after commitMerge: " + segString());
3589 }
3590
3591 if (merge.maxNumSegments != -1 && !dropSegment) {
3592
3593 if (!segmentsToMerge.containsKey(merge.info)) {
3594 segmentsToMerge.put(merge.info, Boolean.FALSE);
3595 }
3596 }
3597
3598 return true;
3599 }
3600
3601 final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException {
3602
3603 if (infoStream.isEnabled("IW")) {
3604 infoStream.message("IW", "handleMergeException: merge=" + segString(merge.segments) + " exc=" + t);
3605 }
3606
3607
3608
3609
3610 merge.setException(t);
3611 addMergeException(merge);
3612
3613 if (t instanceof MergePolicy.MergeAbortedException) {
3614
3615
3616
3617
3618
3619
3620 if (merge.isExternal) {
3621 throw (MergePolicy.MergeAbortedException) t;
3622 }
3623 } else {
3624 IOUtils.reThrow(t);
3625 }
3626 }
3627
3628
3629
3630
3631
3632
3633
3634 public void merge(MergePolicy.OneMerge merge) throws IOException {
3635
3636 boolean success = false;
3637
3638 rateLimiters.set(merge.rateLimiter);
3639
3640 final long t0 = System.currentTimeMillis();
3641
3642 final MergePolicy mergePolicy = config.getMergePolicy();
3643 try {
3644 try {
3645 try {
3646 mergeInit(merge);
3647
3648
3649
3650
3651 if (infoStream.isEnabled("IW")) {
3652 infoStream.message("IW", "now merge\n merge=" + segString(merge.segments) + "\n index=" + segString());
3653 }
3654
3655 mergeMiddle(merge, mergePolicy);
3656 mergeSuccess(merge);
3657 success = true;
3658 } catch (Throwable t) {
3659 handleMergeException(t, merge);
3660 }
3661 } finally {
3662 synchronized(this) {
3663
3664 mergeFinish(merge);
3665
3666 if (success == false) {
3667 if (infoStream.isEnabled("IW")) {
3668 infoStream.message("IW", "hit exception during merge");
3669 }
3670 } else if (merge.rateLimiter.getAbort() == false && (merge.maxNumSegments != -1 || (!closed && !closing))) {
3671
3672
3673
3674 updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments);
3675 }
3676 }
3677 }
3678 } catch (Throwable t) {
3679
3680
3681 tragicEvent(t, "merge");
3682 }
3683
3684 if (merge.info != null && merge.rateLimiter.getAbort() == false) {
3685 if (infoStream.isEnabled("IW")) {
3686 infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.maxDoc() + " docs");
3687 }
3688 }
3689 }
3690
3691
3692 void mergeSuccess(MergePolicy.OneMerge merge) {
3693 }
3694
3695
3696
3697
3698
3699
3700
3701 final synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws IOException {
3702
3703 if (merge.registerDone) {
3704 return true;
3705 }
3706 assert merge.segments.size() > 0;
3707
3708 if (stopMerges) {
3709 merge.rateLimiter.setAbort();
3710 throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
3711 }
3712
3713 boolean isExternal = false;
3714 for(SegmentCommitInfo info : merge.segments) {
3715 if (mergingSegments.contains(info)) {
3716 if (infoStream.isEnabled("IW")) {
3717 infoStream.message("IW", "reject merge " + segString(merge.segments) + ": segment " + segString(info) + " is already marked for merge");
3718 }
3719 return false;
3720 }
3721 if (!segmentInfos.contains(info)) {
3722 if (infoStream.isEnabled("IW")) {
3723 infoStream.message("IW", "reject merge " + segString(merge.segments) + ": segment " + segString(info) + " does not exist in live infos");
3724 }
3725 return false;
3726 }
3727 if (info.info.dir != directoryOrig) {
3728 isExternal = true;
3729 }
3730 if (segmentsToMerge.containsKey(info)) {
3731 merge.maxNumSegments = mergeMaxNumSegments;
3732 }
3733 }
3734
3735 ensureValidMerge(merge);
3736
3737 pendingMerges.add(merge);
3738
3739 if (infoStream.isEnabled("IW")) {
3740 infoStream.message("IW", "add merge to pendingMerges: " + segString(merge.segments) + " [total " + pendingMerges.size() + " pending]");
3741 }
3742
3743 merge.mergeGen = mergeGen;
3744 merge.isExternal = isExternal;
3745
3746
3747
3748
3749
3750 if (infoStream.isEnabled("IW")) {
3751 StringBuilder builder = new StringBuilder("registerMerge merging= [");
3752 for (SegmentCommitInfo info : mergingSegments) {
3753 builder.append(info.info.name).append(", ");
3754 }
3755 builder.append("]");
3756
3757
3758 if (infoStream.isEnabled("IW")) {
3759 infoStream.message("IW", builder.toString());
3760 }
3761 }
3762 for(SegmentCommitInfo info : merge.segments) {
3763 if (infoStream.isEnabled("IW")) {
3764 infoStream.message("IW", "registerMerge info=" + segString(info));
3765 }
3766 mergingSegments.add(info);
3767 }
3768
3769 assert merge.estimatedMergeBytes == 0;
3770 assert merge.totalMergeBytes == 0;
3771 for(SegmentCommitInfo info : merge.segments) {
3772 if (info.info.maxDoc() > 0) {
3773 final int delCount = numDeletedDocs(info);
3774 assert delCount <= info.info.maxDoc();
3775 final double delRatio = ((double) delCount)/info.info.maxDoc();
3776 merge.estimatedMergeBytes += info.sizeInBytes() * (1.0 - delRatio);
3777 merge.totalMergeBytes += info.sizeInBytes();
3778 }
3779 }
3780
3781
3782 merge.registerDone = true;
3783
3784 return true;
3785 }
3786
3787
3788
3789 final synchronized void mergeInit(MergePolicy.OneMerge merge) throws IOException {
3790 boolean success = false;
3791 try {
3792 _mergeInit(merge);
3793 success = true;
3794 } finally {
3795 if (!success) {
3796 if (infoStream.isEnabled("IW")) {
3797 infoStream.message("IW", "hit exception in mergeInit");
3798 }
3799 mergeFinish(merge);
3800 }
3801 }
3802 }
3803
3804 synchronized private void _mergeInit(MergePolicy.OneMerge merge) throws IOException {
3805
3806 testPoint("startMergeInit");
3807
3808 assert merge.registerDone;
3809 assert merge.maxNumSegments == -1 || merge.maxNumSegments > 0;
3810
3811 if (tragedy != null) {
3812 throw new IllegalStateException("this writer hit an unrecoverable error; cannot merge", tragedy);
3813 }
3814
3815 if (merge.info != null) {
3816
3817 return;
3818 }
3819
3820 if (merge.rateLimiter.getAbort()) {
3821 return;
3822 }
3823
3824
3825
3826
3827
3828
3829 if (infoStream.isEnabled("IW")) {
3830 infoStream.message("IW", "now apply deletes for " + merge.segments.size() + " merging segments");
3831 }
3832
3833
3834 final BufferedUpdatesStream.ApplyDeletesResult result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, merge.segments);
3835
3836 if (result.anyDeletes) {
3837 checkpoint();
3838 }
3839
3840 if (!keepFullyDeletedSegments && result.allDeleted != null) {
3841 if (infoStream.isEnabled("IW")) {
3842 infoStream.message("IW", "drop 100% deleted segments: " + result.allDeleted);
3843 }
3844 for(SegmentCommitInfo info : result.allDeleted) {
3845 segmentInfos.remove(info);
3846 pendingNumDocs.addAndGet(-info.info.maxDoc());
3847 if (merge.segments.contains(info)) {
3848 mergingSegments.remove(info);
3849 merge.segments.remove(info);
3850 }
3851 readerPool.drop(info);
3852 }
3853 checkpoint();
3854 }
3855
3856
3857
3858
3859 final String mergeSegmentName = newSegmentName();
3860 SegmentInfo si = new SegmentInfo(directoryOrig, Version.LATEST, mergeSegmentName, -1, false, codec, Collections.<String,String>emptyMap(), StringHelper.randomId(), new HashMap<String,String>());
3861 Map<String,String> details = new HashMap<>();
3862 details.put("mergeMaxNumSegments", "" + merge.maxNumSegments);
3863 details.put("mergeFactor", Integer.toString(merge.segments.size()));
3864 setDiagnostics(si, SOURCE_MERGE, details);
3865 merge.setMergeInfo(new SegmentCommitInfo(si, 0, -1L, -1L, -1L));
3866
3867
3868
3869
3870 bufferedUpdatesStream.prune(segmentInfos);
3871
3872 if (infoStream.isEnabled("IW")) {
3873 infoStream.message("IW", "merge seg=" + merge.info.info.name + " " + segString(merge.segments));
3874 }
3875 }
3876
3877 static void setDiagnostics(SegmentInfo info, String source) {
3878 setDiagnostics(info, source, null);
3879 }
3880
3881 private static void setDiagnostics(SegmentInfo info, String source, Map<String,String> details) {
3882 Map<String,String> diagnostics = new HashMap<>();
3883 diagnostics.put("source", source);
3884 diagnostics.put("lucene.version", Version.LATEST.toString());
3885 diagnostics.put("os", Constants.OS_NAME);
3886 diagnostics.put("os.arch", Constants.OS_ARCH);
3887 diagnostics.put("os.version", Constants.OS_VERSION);
3888 diagnostics.put("java.version", Constants.JAVA_VERSION);
3889 diagnostics.put("java.vendor", Constants.JAVA_VENDOR);
3890
3891 diagnostics.put("java.runtime.version", System.getProperty("java.runtime.version", "undefined"));
3892
3893 diagnostics.put("java.vm.version", System.getProperty("java.vm.version", "undefined"));
3894 diagnostics.put("timestamp", Long.toString(new Date().getTime()));
3895 if (details != null) {
3896 diagnostics.putAll(details);
3897 }
3898 info.setDiagnostics(diagnostics);
3899 }
3900
3901
3902
3903 final synchronized void mergeFinish(MergePolicy.OneMerge merge) {
3904
3905
3906
3907 notifyAll();
3908
3909
3910
3911 if (merge.registerDone) {
3912 final List<SegmentCommitInfo> sourceSegments = merge.segments;
3913 for (SegmentCommitInfo info : sourceSegments) {
3914 mergingSegments.remove(info);
3915 }
3916 merge.registerDone = false;
3917 }
3918
3919 runningMerges.remove(merge);
3920 }
3921
3922 private final synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
3923 final int numSegments = merge.readers.size();
3924 Throwable th = null;
3925
3926 boolean drop = !suppressExceptions;
3927
3928 for (int i = 0; i < numSegments; i++) {
3929 final SegmentReader sr = merge.readers.get(i);
3930 if (sr != null) {
3931 try {
3932 final ReadersAndUpdates rld = readerPool.get(sr.getSegmentInfo(), false);
3933
3934 assert rld != null;
3935 if (drop) {
3936 rld.dropChanges();
3937 } else {
3938 rld.dropMergingUpdates();
3939 }
3940 rld.release(sr);
3941 readerPool.release(rld);
3942 if (drop) {
3943 readerPool.drop(rld.info);
3944 }
3945 } catch (Throwable t) {
3946 if (th == null) {
3947 th = t;
3948 }
3949 }
3950 merge.readers.set(i, null);
3951 }
3952 }
3953
3954 try {
3955 merge.mergeFinished();
3956 } catch (Throwable t) {
3957 if (th == null) {
3958 th = t;
3959 }
3960 }
3961
3962
3963 if (!suppressExceptions) {
3964 IOUtils.reThrow(th);
3965 }
3966 }
3967
3968
3969
3970
3971 private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
3972
3973 merge.rateLimiter.checkAbort();
3974
3975 List<SegmentCommitInfo> sourceSegments = merge.segments;
3976
3977 IOContext context = new IOContext(merge.getStoreMergeInfo());
3978
3979 final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory);
3980
3981 if (infoStream.isEnabled("IW")) {
3982 infoStream.message("IW", "merging " + segString(merge.segments));
3983 }
3984
3985 merge.readers = new ArrayList<>(sourceSegments.size());
3986
3987
3988
3989 boolean success = false;
3990 try {
3991 int segUpto = 0;
3992 while(segUpto < sourceSegments.size()) {
3993
3994 final SegmentCommitInfo info = sourceSegments.get(segUpto);
3995
3996
3997
3998 final ReadersAndUpdates rld = readerPool.get(info, true);
3999
4000
4001 SegmentReader reader;
4002 final Bits liveDocs;
4003 final int delCount;
4004
4005 synchronized (this) {
4006
4007
4008 reader = rld.getReaderForMerge(context);
4009 liveDocs = rld.getReadOnlyLiveDocs();
4010 delCount = rld.getPendingDeleteCount() + info.getDelCount();
4011
4012 assert reader != null;
4013 assert rld.verifyDocCounts();
4014
4015 if (infoStream.isEnabled("IW")) {
4016 if (rld.getPendingDeleteCount() != 0) {
4017 infoStream.message("IW", "seg=" + segString(info) + " delCount=" + info.getDelCount() + " pendingDelCount=" + rld.getPendingDeleteCount());
4018 } else if (info.getDelCount() != 0) {
4019 infoStream.message("IW", "seg=" + segString(info) + " delCount=" + info.getDelCount());
4020 } else {
4021 infoStream.message("IW", "seg=" + segString(info) + " no deletes");
4022 }
4023 }
4024 }
4025
4026
4027
4028
4029
4030 if (reader.numDeletedDocs() != delCount) {
4031
4032 assert delCount > reader.numDeletedDocs();
4033
4034 SegmentReader newReader;
4035
4036 synchronized (this) {
4037
4038
4039 newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - delCount);
4040 }
4041
4042 boolean released = false;
4043 try {
4044 rld.release(reader);
4045 released = true;
4046 } finally {
4047 if (!released) {
4048 newReader.decRef();
4049 }
4050 }
4051
4052 reader = newReader;
4053 }
4054
4055 merge.readers.add(reader);
4056 assert delCount <= info.info.maxDoc(): "delCount=" + delCount + " info.maxDoc=" + info.info.maxDoc() + " rld.pendingDeleteCount=" + rld.getPendingDeleteCount() + " info.getDelCount()=" + info.getDelCount();
4057 segUpto++;
4058 }
4059
4060
4061
4062
4063
4064 final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(),
4065 merge.info.info, infoStream, dirWrapper,
4066 globalFieldNumberMap,
4067 context);
4068
4069 merge.rateLimiter.checkAbort();
4070
4071 merge.mergeStartNS = System.nanoTime();
4072
4073
4074 if (merger.shouldMerge()) {
4075 merger.merge();
4076 }
4077
4078 MergeState mergeState = merger.mergeState;
4079 assert mergeState.segmentInfo == merge.info.info;
4080 merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles()));
4081
4082 if (infoStream.isEnabled("IW")) {
4083 if (merger.shouldMerge()) {
4084 long t1 = System.nanoTime();
4085 double sec = (t1-merge.mergeStartNS)/1000000000.;
4086 double segmentMB = (merge.info.sizeInBytes()/1024./1024.);
4087 double stoppedSec = merge.rateLimiter.getTotalStoppedNS()/1000000000.;
4088 double throttleSec = merge.rateLimiter.getTotalPausedNS()/1000000000.;
4089 infoStream.message("IW", "merge codec=" + codec + " maxDoc=" + merge.info.info.maxDoc() + "; merged segment has " +
4090 (mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
4091 (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
4092 (mergeState.mergeFieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
4093 (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " +
4094 (mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " +
4095 String.format(Locale.ROOT,
4096 "%.1f sec (%.1f sec stopped, %.1f sec paused) to merge segment [%.2f MB, %.2f MB/sec]",
4097 sec,
4098 stoppedSec,
4099 throttleSec,
4100 segmentMB,
4101 segmentMB / sec));
4102 } else {
4103 infoStream.message("IW", "skip merging fully deleted segments");
4104 }
4105 }
4106
4107 if (merger.shouldMerge() == false) {
4108
4109 assert merge.info.info.maxDoc() == 0;
4110 commitMerge(merge, mergeState);
4111 return 0;
4112 }
4113
4114 assert merge.info.info.maxDoc() > 0;
4115
4116
4117
4118
4119
4120 boolean useCompoundFile;
4121 synchronized (this) {
4122 useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, merge.info, this);
4123 }
4124
4125 if (useCompoundFile) {
4126 success = false;
4127
4128 Collection<String> filesToRemove = merge.info.files();
4129 TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory);
4130 try {
4131 createCompoundFile(infoStream, trackingCFSDir, merge.info.info, context);
4132 success = true;
4133 } catch (Throwable t) {
4134 synchronized(this) {
4135 if (merge.rateLimiter.getAbort()) {
4136
4137
4138
4139 if (infoStream.isEnabled("IW")) {
4140 infoStream.message("IW", "hit merge abort exception creating compound file during merge");
4141 }
4142 return 0;
4143 } else {
4144 handleMergeException(t, merge);
4145 }
4146 }
4147 } finally {
4148 if (success == false) {
4149 if (infoStream.isEnabled("IW")) {
4150 infoStream.message("IW", "hit exception creating compound file during merge");
4151 }
4152
4153 deleteNewFiles(merge.info.files());
4154 }
4155 }
4156
4157
4158
4159
4160 success = false;
4161
4162 synchronized(this) {
4163
4164
4165
4166 deleteNewFiles(filesToRemove);
4167
4168 if (merge.rateLimiter.getAbort()) {
4169 if (infoStream.isEnabled("IW")) {
4170 infoStream.message("IW", "abort merge after building CFS");
4171 }
4172
4173 deleteNewFiles(merge.info.files());
4174 return 0;
4175 }
4176 }
4177
4178 merge.info.info.setUseCompoundFile(true);
4179 } else {
4180
4181
4182
4183 success = false;
4184 }
4185
4186
4187
4188
4189
4190 boolean success2 = false;
4191 try {
4192 codec.segmentInfoFormat().write(directory, merge.info.info, context);
4193 success2 = true;
4194 } finally {
4195 if (!success2) {
4196
4197 deleteNewFiles(merge.info.files());
4198 }
4199 }
4200
4201
4202
4203
4204
4205 if (infoStream.isEnabled("IW")) {
4206 infoStream.message("IW", String.format(Locale.ROOT, "merged segment size=%.3f MB vs estimate=%.3f MB", merge.info.sizeInBytes()/1024./1024., merge.estimatedMergeBytes/1024/1024.));
4207 }
4208
4209 final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer();
4210 if (poolReaders && mergedSegmentWarmer != null) {
4211 final ReadersAndUpdates rld = readerPool.get(merge.info, true);
4212 final SegmentReader sr = rld.getReader(IOContext.READ);
4213 try {
4214 mergedSegmentWarmer.warm(sr);
4215 } finally {
4216 synchronized(this) {
4217 rld.release(sr);
4218 readerPool.release(rld);
4219 }
4220 }
4221 }
4222
4223 if (!commitMerge(merge, mergeState)) {
4224
4225
4226 return 0;
4227 }
4228
4229 success = true;
4230
4231 } finally {
4232
4233
4234 if (success == false) {
4235 closeMergeReaders(merge, true);
4236 }
4237 }
4238
4239 return merge.info.info.maxDoc();
4240 }
4241
4242 synchronized void addMergeException(MergePolicy.OneMerge merge) {
4243 assert merge.getException() != null;
4244 if (!mergeExceptions.contains(merge) && mergeGen == merge.mergeGen) {
4245 mergeExceptions.add(merge);
4246 }
4247 }
4248
4249
4250 final int getBufferedDeleteTermsSize() {
4251 return docWriter.getBufferedDeleteTermsSize();
4252 }
4253
4254
4255 final int getNumBufferedDeleteTerms() {
4256 return docWriter.getNumBufferedDeleteTerms();
4257 }
4258
4259
4260 synchronized SegmentCommitInfo newestSegment() {
4261 return segmentInfos.size() > 0 ? segmentInfos.info(segmentInfos.size()-1) : null;
4262 }
4263
4264
4265
4266
4267
4268 synchronized String segString() {
4269 return segString(segmentInfos);
4270 }
4271
4272
4273
4274
4275
4276 synchronized String segString(Iterable<SegmentCommitInfo> infos) {
4277 final StringBuilder buffer = new StringBuilder();
4278 for(final SegmentCommitInfo info : infos) {
4279 if (buffer.length() > 0) {
4280 buffer.append(' ');
4281 }
4282 buffer.append(segString(info));
4283 }
4284 return buffer.toString();
4285 }
4286
4287
4288
4289
4290
4291 synchronized String segString(SegmentCommitInfo info) {
4292 return info.toString(numDeletedDocs(info) - info.getDelCount());
4293 }
4294
4295 private synchronized void doWait() {
4296
4297
4298
4299
4300
4301
4302 try {
4303 wait(1000);
4304 } catch (InterruptedException ie) {
4305 throw new ThreadInterruptedException(ie);
4306 }
4307 }
4308
4309 private boolean keepFullyDeletedSegments;
4310
4311
4312
4313
4314 void setKeepFullyDeletedSegments(boolean v) {
4315 keepFullyDeletedSegments = v;
4316 }
4317
4318 boolean getKeepFullyDeletedSegments() {
4319 return keepFullyDeletedSegments;
4320 }
4321
4322
4323 private boolean filesExist(SegmentInfos toSync) throws IOException {
4324
4325 Collection<String> files = toSync.files(false);
4326 for(final String fileName: files) {
4327 assert slowFileExists(directory, fileName): "file " + fileName + " does not exist; files=" + Arrays.toString(directory.listAll());
4328
4329
4330
4331
4332
4333 assert deleter.exists(fileName): "IndexFileDeleter doesn't know about file " + fileName;
4334 }
4335 return true;
4336 }
4337
4338
4339 synchronized SegmentInfos toLiveInfos(SegmentInfos sis) {
4340 final SegmentInfos newSIS = new SegmentInfos();
4341 final Map<SegmentCommitInfo,SegmentCommitInfo> liveSIS = new HashMap<>();
4342 for(SegmentCommitInfo info : segmentInfos) {
4343 liveSIS.put(info, info);
4344 }
4345 for(SegmentCommitInfo info : sis) {
4346 SegmentCommitInfo liveInfo = liveSIS.get(info);
4347 if (liveInfo != null) {
4348 info = liveInfo;
4349 }
4350 newSIS.add(info);
4351 }
4352
4353 return newSIS;
4354 }
4355
4356
4357
4358
4359
4360
4361 private void startCommit(final SegmentInfos toSync) throws IOException {
4362
4363 testPoint("startStartCommit");
4364 assert pendingCommit == null;
4365
4366 if (tragedy != null) {
4367 throw new IllegalStateException("this writer hit an unrecoverable error; cannot commit", tragedy);
4368 }
4369
4370 try {
4371
4372 if (infoStream.isEnabled("IW")) {
4373 infoStream.message("IW", "startCommit(): start");
4374 }
4375
4376 synchronized(this) {
4377
4378 if (lastCommitChangeCount > changeCount.get()) {
4379 throw new IllegalStateException("lastCommitChangeCount=" + lastCommitChangeCount + ",changeCount=" + changeCount);
4380 }
4381
4382 if (pendingCommitChangeCount == lastCommitChangeCount) {
4383 if (infoStream.isEnabled("IW")) {
4384 infoStream.message("IW", " skip startCommit(): no changes pending");
4385 }
4386 try {
4387 deleter.decRef(filesToCommit);
4388 } finally {
4389 filesToCommit = null;
4390 }
4391 return;
4392 }
4393
4394 if (infoStream.isEnabled("IW")) {
4395 infoStream.message("IW", "startCommit index=" + segString(toLiveInfos(toSync)) + " changeCount=" + changeCount);
4396 }
4397
4398 assert filesExist(toSync);
4399 }
4400
4401 testPoint("midStartCommit");
4402
4403 boolean pendingCommitSet = false;
4404
4405 try {
4406
4407 testPoint("midStartCommit2");
4408
4409 synchronized(this) {
4410
4411 assert pendingCommit == null;
4412
4413 assert segmentInfos.getGeneration() == toSync.getGeneration();
4414
4415
4416
4417
4418 toSync.prepareCommit(directory);
4419 if (infoStream.isEnabled("IW")) {
4420 infoStream.message("IW", "startCommit: wrote pending segments file \"" + IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", toSync.getGeneration()) + "\"");
4421 }
4422
4423
4424
4425 pendingCommitSet = true;
4426 pendingCommit = toSync;
4427 }
4428
4429
4430
4431 boolean success = false;
4432 final Collection<String> filesToSync;
4433 try {
4434 filesToSync = toSync.files(false);
4435 directory.sync(filesToSync);
4436 success = true;
4437 } finally {
4438 if (!success) {
4439 pendingCommitSet = false;
4440 pendingCommit = null;
4441 toSync.rollbackCommit(directory);
4442 }
4443 }
4444
4445 if (infoStream.isEnabled("IW")) {
4446 infoStream.message("IW", "done all syncs: " + filesToSync);
4447 }
4448
4449 testPoint("midStartCommitSuccess");
4450
4451 } finally {
4452 synchronized(this) {
4453
4454
4455
4456
4457 segmentInfos.updateGeneration(toSync);
4458
4459 if (!pendingCommitSet) {
4460 if (infoStream.isEnabled("IW")) {
4461 infoStream.message("IW", "hit exception committing segments file");
4462 }
4463
4464
4465 deleter.decRefWhileHandlingException(filesToCommit);
4466 filesToCommit = null;
4467 }
4468 }
4469 }
4470 } catch (VirtualMachineError tragedy) {
4471 tragicEvent(tragedy, "startCommit");
4472 }
4473 testPoint("finishStartCommit");
4474 }
4475
4476
4477
4478
4479
4480
4481
4482
4483
4484 @Deprecated
4485 public static boolean isLocked(Directory directory) throws IOException {
4486 try {
4487 directory.obtainLock(WRITE_LOCK_NAME).close();
4488 return false;
4489 } catch (LockObtainFailedException failed) {
4490 return true;
4491 }
4492 }
4493
4494
4495
4496
4497
4498
4499
4500
4501
4502
4503
4504
4505
4506
4507 public static abstract class IndexReaderWarmer {
4508
4509
4510
4511 protected IndexReaderWarmer() {
4512 }
4513
4514
4515
4516
4517 public abstract void warm(LeafReader reader) throws IOException;
4518 }
4519
4520 void tragicEvent(Throwable tragedy, String location) throws IOException {
4521
4522
4523 if (tragedy instanceof AbortingException) {
4524 tragedy = tragedy.getCause();
4525 }
4526
4527
4528
4529 assert tragedy instanceof MergePolicy.MergeAbortedException == false;
4530
4531
4532 assert Thread.holdsLock(this) == false;
4533
4534
4535 assert tragedy != null;
4536
4537 if (infoStream.isEnabled("IW")) {
4538 infoStream.message("IW", "hit tragic " + tragedy.getClass().getSimpleName() + " inside " + location);
4539 }
4540
4541 synchronized (this) {
4542
4543 if (this.tragedy != null) {
4544
4545 IOUtils.reThrow(tragedy);
4546 }
4547
4548 this.tragedy = tragedy;
4549 }
4550
4551
4552 if (shouldClose(false)) {
4553 rollbackInternal();
4554 }
4555
4556 IOUtils.reThrow(tragedy);
4557 }
4558
4559
4560
4561
4562 public Throwable getTragicException() {
4563 return tragedy;
4564 }
4565
4566
4567 public boolean isOpen() {
4568 return closing == false && closed == false;
4569 }
4570
4571
4572
4573
4574
4575
4576
4577
4578
4579
4580
4581
4582 private final void testPoint(String message) {
4583 if (enableTestPoints) {
4584 assert infoStream.isEnabled("TP");
4585 infoStream.message("TP", message);
4586 }
4587 }
4588
4589 synchronized boolean nrtIsCurrent(SegmentInfos infos) {
4590
4591 ensureOpen();
4592 boolean isCurrent = infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedUpdatesStream.any();
4593 if (infoStream.isEnabled("IW")) {
4594 if (isCurrent == false) {
4595 infoStream.message("IW", "nrtIsCurrent: infoVersion matches: " + (infos.version == segmentInfos.version) + "; DW changes: " + docWriter.anyChanges() + "; BD changes: "+ bufferedUpdatesStream.any());
4596 }
4597 }
4598 return isCurrent;
4599 }
4600
4601 synchronized boolean isClosed() {
4602 return closed;
4603 }
4604
4605
4606
4607
4608
4609
4610
4611
4612
4613
4614
4615
4616
4617
4618
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
4629
4630 public synchronized void deleteUnusedFiles() throws IOException {
4631 ensureOpen(false);
4632 deleter.deletePendingFiles();
4633 deleter.revisitPolicy();
4634 }
4635
4636 private synchronized void deletePendingFiles() throws IOException {
4637 deleter.deletePendingFiles();
4638 }
4639
4640
4641
4642
4643
4644
4645
4646 final void createCompoundFile(InfoStream infoStream, TrackingDirectoryWrapper directory, final SegmentInfo info, IOContext context) throws IOException {
4647
4648
4649 if (!directory.getCreatedFiles().isEmpty()) {
4650 throw new IllegalStateException("pass a clean trackingdir for CFS creation");
4651 }
4652
4653 if (infoStream.isEnabled("IW")) {
4654 infoStream.message("IW", "create compound file");
4655 }
4656
4657 boolean success = false;
4658 try {
4659 info.getCodec().compoundFormat().write(directory, info, context);
4660 success = true;
4661 } finally {
4662 if (!success) {
4663
4664 deleteNewFiles(directory.getCreatedFiles());
4665 }
4666 }
4667
4668
4669 info.setFiles(new HashSet<>(directory.getCreatedFiles()));
4670 }
4671
4672
4673
4674
4675
4676
4677
4678 synchronized final void deleteNewFiles(Collection<String> files) throws IOException {
4679 deleter.deleteNewFiles(files);
4680 }
4681
4682
4683
4684
4685 synchronized final void flushFailed(SegmentInfo info) throws IOException {
4686
4687 Collection<String> files;
4688 try {
4689 files = info.files();
4690 } catch (IllegalStateException ise) {
4691
4692 files = null;
4693 }
4694 if (files != null) {
4695 deleter.deleteNewFiles(files);
4696 }
4697 }
4698
4699 final int purge(boolean forced) throws IOException {
4700 return docWriter.purgeBuffer(this, forced);
4701 }
4702
4703 final void applyDeletesAndPurge(boolean forcePurge) throws IOException {
4704 try {
4705 purge(forcePurge);
4706 } finally {
4707 if (applyAllDeletesAndUpdates()) {
4708 maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
4709 }
4710 flushCount.incrementAndGet();
4711 }
4712 }
4713
4714 final void doAfterSegmentFlushed(boolean triggerMerge, boolean forcePurge) throws IOException {
4715 try {
4716 purge(forcePurge);
4717 } finally {
4718 if (triggerMerge) {
4719 maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
4720 }
4721 }
4722 }
4723
4724 synchronized void incRefDeleter(SegmentInfos segmentInfos) throws IOException {
4725 ensureOpen();
4726 deleter.incRef(segmentInfos, false);
4727 if (infoStream.isEnabled("IW")) {
4728 infoStream.message("IW", "incRefDeleter for NRT reader version=" + segmentInfos.getVersion() + " segments=" + segString(segmentInfos));
4729 }
4730 }
4731
4732 synchronized void decRefDeleter(SegmentInfos segmentInfos) throws IOException {
4733 ensureOpen();
4734 deleter.decRef(segmentInfos);
4735 if (infoStream.isEnabled("IW")) {
4736 infoStream.message("IW", "decRefDeleter for NRT reader version=" + segmentInfos.getVersion() + " segments=" + segString(segmentInfos));
4737 }
4738 }
4739
4740 private boolean processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
4741 return processEvents(eventQueue, triggerMerge, forcePurge);
4742 }
4743
4744 private boolean processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
4745 boolean processed = false;
4746 if (tragedy == null) {
4747 Event event;
4748 while((event = queue.poll()) != null) {
4749 processed = true;
4750 event.process(this, triggerMerge, forcePurge);
4751 }
4752 }
4753 return processed;
4754 }
4755
4756
4757
4758
4759
4760
4761
4762 static interface Event {
4763
4764
4765
4766
4767
4768
4769
4770
4771
4772
4773
4774
4775
4776
4777 void process(IndexWriter writer, boolean triggerMerge, boolean clearBuffers) throws IOException;
4778 }
4779
4780
4781
4782
4783
4784 static boolean slowFileExists(Directory dir, String fileName) throws IOException {
4785 try {
4786 dir.openInput(fileName, IOContext.DEFAULT).close();
4787 return true;
4788 } catch (NoSuchFileException | FileNotFoundException e) {
4789 return false;
4790 }
4791 }
4792
4793
4794
4795
4796 private void reserveDocs(long addedNumDocs) {
4797 assert addedNumDocs >= 0;
4798 if (pendingNumDocs.addAndGet(addedNumDocs) > actualMaxDocs) {
4799
4800 pendingNumDocs.addAndGet(-addedNumDocs);
4801 tooManyDocs(addedNumDocs);
4802 }
4803 }
4804
4805
4806
4807
4808 private void testReserveDocs(long addedNumDocs) {
4809 assert addedNumDocs >= 0;
4810 if (pendingNumDocs.get() + addedNumDocs > actualMaxDocs) {
4811 tooManyDocs(addedNumDocs);
4812 }
4813 }
4814
4815 private void tooManyDocs(long addedNumDocs) {
4816 assert addedNumDocs >= 0;
4817 throw new IllegalArgumentException("number of documents in the index cannot exceed " + actualMaxDocs + " (current document count is " + pendingNumDocs.get() + "; added numDocs is " + addedNumDocs + ")");
4818 }
4819
4820
4821
4822 private Directory addMergeRateLimiters(Directory in) {
4823 return new FilterDirectory(in) {
4824 @Override
4825 public IndexOutput createOutput(String name, IOContext context) throws IOException {
4826 ensureOpen();
4827
4828
4829 IndexWriter.this.ensureOpen(false);
4830
4831
4832
4833
4834 assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
4835
4836 MergeRateLimiter rateLimiter = rateLimiters.get();
4837 assert rateLimiter != null;
4838
4839 return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
4840 }
4841 };
4842 }
4843 }